Anope IRC Services  Version 2.0
db_redis.cpp
Go to the documentation of this file.
1 /*
2  *
3  * (C) 2003-2014 Anope Team
4  * Contact us at team@anope.org
5  *
6  * Please read COPYING and README for further details.
7  *
8  *
9  */
10 
11 #include "module.h"
12 #include "modules/redis.h"
13 
14 using namespace Redis;
15 
17 static DatabaseRedis *me;
18 
19 class Data : public Serialize::Data
20 {
21  public:
22  std::map<Anope::string, std::stringstream *> data;
23 
25  {
26  for (std::map<Anope::string, std::stringstream *>::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it)
27  delete it->second;
28  }
29 
30  std::iostream& operator[](const Anope::string &key) anope_override
31  {
32  std::stringstream* &stream = data[key];
33  if (!stream)
34  stream = new std::stringstream();
35  return *stream;
36  }
37 
38  std::set<Anope::string> KeySet() const anope_override
39  {
40  std::set<Anope::string> keys;
41  for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
42  keys.insert(it->first);
43  return keys;
44  }
45 
46  size_t Hash() const anope_override
47  {
48  size_t hash = 0;
49  for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
50  if (!it->second->str().empty())
51  hash ^= Anope::hash_cs()(it->second->str());
52  return hash;
53  }
54 };
55 
56 class TypeLoader : public Interface
57 {
59  public:
60  TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { }
61 
62  void OnResult(const Reply &r) anope_override;
63 };
64 
65 class ObjectLoader : public Interface
66 {
68  int64_t id;
69 
70  public:
71  ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
72 
73  void OnResult(const Reply &r) anope_override;
74 };
75 
76 class IDInterface : public Interface
77 {
79  public:
80  IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { }
81 
82  void OnResult(const Reply &r) anope_override;
83 };
84 
85 class Deleter : public Interface
86 {
88  int64_t id;
89  public:
90  Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
91 
92  void OnResult(const Reply &r) anope_override;
93 };
94 
95 class Updater : public Interface
96 {
98  int64_t id;
99  public:
100  Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
101 
102  void OnResult(const Reply &r) anope_override;
103 };
104 
105 class ModifiedObject : public Interface
106 {
108  int64_t id;
109  public:
110  ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
111 
112  void OnResult(const Reply &r) anope_override;
113 };
114 
116 {
117  public:
118  SubscriptionListener(Module *creator) : Interface(creator) { }
119 
120  void OnResult(const Reply &r) anope_override;
121 };
122 
123 class DatabaseRedis : public Module, public Pipe
124 {
126  std::set<Serializable *> updated_items;
127 
128  public:
130 
131  DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR), sl(this)
132  {
133  me = this;
134 
135  }
136 
137  /* Insert or update an object */
139  {
141 
142  /* If there is no id yet for ths object, get one */
143  if (!obj->id)
144  redis->SendCommand(new IDInterface(this, obj), "INCR id:" + t->GetName());
145  else
146  {
147  Data data;
148  obj->Serialize(data);
149 
150  if (obj->IsCached(data))
151  return;
152 
153  obj->UpdateCache(data);
154 
155  std::vector<Anope::string> args;
156  args.push_back("HGETALL");
157  args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id));
158 
159  /* Get object attrs to clear before updating */
160  redis->SendCommand(new Updater(this, t->GetName(), obj->id), args);
161  }
162  }
163 
165  {
166  for (std::set<Serializable *>::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it)
167  {
168  Serializable *s = *it;
169 
170  this->InsertObject(s);
171  }
172 
173  this->updated_items.clear();
174  }
175 
177  {
178  Configuration::Block *block = conf->GetModule(this);
179  this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<const Anope::string>("engine", "redis/main"));
180  }
181 
183  {
184  const std::vector<Anope::string> type_order = Serialize::Type::GetTypeOrder();
185  for (unsigned i = 0; i < type_order.size(); ++i)
186  {
187  Serialize::Type *sb = Serialize::Type::Find(type_order[i]);
188  this->OnSerializeTypeCreate(sb);
189  }
190 
191  while (redis->BlockAndProcess());
192 
193  redis->Subscribe(&this->sl, "__keyspace@*__:hash:*");
194 
195  return EVENT_STOP;
196  }
197 
199  {
200  if (!redis)
201  return;
202 
203  std::vector<Anope::string> args;
204  args.push_back("SMEMBERS");
205  args.push_back("ids:" + sb->GetName());
206 
207  redis->SendCommand(new TypeLoader(this, sb->GetName()), args);
208  }
209 
211  {
212  this->updated_items.insert(obj);
213  this->Notify();
214  }
215 
217  {
218  Serialize::Type *t = obj->GetSerializableType();
219 
220  std::vector<Anope::string> args;
221  args.push_back("HGETALL");
222  args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id));
223 
224  /* Get all of the attributes for this object */
225  redis->SendCommand(new Deleter(this, t->GetName(), obj->id), args);
226 
227  this->updated_items.erase(obj);
228  t->objects.erase(obj->id);
229  this->Notify();
230  }
231 
233  {
234  this->updated_items.insert(obj);
235  this->Notify();
236  }
237 };
238 
240 {
241  if (r.type != Reply::MULTI_BULK || !me->redis)
242  {
243  delete this;
244  return;
245  }
246 
247  for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
248  {
249  const Reply *reply = r.multi_bulk[i];
250 
251  if (reply->type != Reply::BULK)
252  continue;
253 
254  int64_t id;
255  try
256  {
257  id = convertTo<int64_t>(reply->bulk);
258  }
259  catch (const ConvertException &)
260  {
261  continue;
262  }
263 
264  std::vector<Anope::string> args;
265  args.push_back("HGETALL");
266  args.push_back("hash:" + this->type + ":" + stringify(id));
267 
268  me->redis->SendCommand(new ObjectLoader(me, this->type, id), args);
269  }
270 
271  delete this;
272 }
273 
275 {
276  Serialize::Type *st = Serialize::Type::Find(this->type);
277 
278  if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st)
279  {
280  delete this;
281  return;
282  }
283 
284  Data data;
285 
286  for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
287  {
288  const Reply *key = r.multi_bulk[i],
289  *value = r.multi_bulk[i + 1];
290 
291  data[key->bulk] << value->bulk;
292  }
293 
294  Serializable* &obj = st->objects[this->id];
295  obj = st->Unserialize(obj, data);
296  if (obj)
297  {
298  obj->id = this->id;
299  obj->UpdateCache(data);
300  }
301 
302  delete this;
303 }
304 
306 {
307  if (!o || r.type != Reply::INT || !r.i)
308  {
309  delete this;
310  return;
311  }
312 
313  Serializable* &obj = o->GetSerializableType()->objects[r.i];
314  if (obj)
315  /* This shouldn't be possible */
316  obj->id = 0;
317 
318  o->id = r.i;
319  obj = o;
320 
321  /* Now that we have the id, insert this object for real */
322  anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o);
323 
324  delete this;
325 }
326 
327 void Deleter::OnResult(const Reply &r)
328 {
329  if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty())
330  {
331  delete this;
332  return;
333  }
334 
335  /* Transaction start */
336  me->redis->StartTransaction();
337 
338  std::vector<Anope::string> args;
339  args.push_back("DEL");
340  args.push_back("hash:" + this->type + ":" + stringify(this->id));
341 
342  /* Delete hash object */
343  me->redis->SendCommand(NULL, args);
344 
345  args.clear();
346  args.push_back("SREM");
347  args.push_back("ids:" + this->type);
348  args.push_back(stringify(this->id));
349 
350  /* Delete id from ids set */
351  me->redis->SendCommand(NULL, args);
352 
353  for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
354  {
355  const Reply *key = r.multi_bulk[i],
356  *value = r.multi_bulk[i + 1];
357 
358  args.clear();
359  args.push_back("SREM");
360  args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk);
361  args.push_back(stringify(this->id));
362 
363  /* Delete value -> object id */
364  me->redis->SendCommand(NULL, args);
365  }
366 
367  /* Transaction end */
368  me->redis->CommitTransaction();
369 
370  delete this;
371 }
372 
373 void Updater::OnResult(const Reply &r)
374 {
375  Serialize::Type *st = Serialize::Type::Find(this->type);
376 
377  if (!st)
378  {
379  delete this;
380  return;
381  }
382 
383  Serializable *obj = st->objects[this->id];
384  if (!obj)
385  {
386  delete this;
387  return;
388  }
389 
390  Data data;
391  obj->Serialize(data);
392 
393  /* Transaction start */
394  me->redis->StartTransaction();
395 
396  for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
397  {
398  const Reply *key = r.multi_bulk[i],
399  *value = r.multi_bulk[i + 1];
400 
401  std::vector<Anope::string> args;
402  args.push_back("SREM");
403  args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk);
404  args.push_back(stringify(this->id));
405 
406  /* Delete value -> object id */
407  me->redis->SendCommand(NULL, args);
408  }
409 
410  /* Add object id to id set for this type */
411  std::vector<Anope::string> args;
412  args.push_back("SADD");
413  args.push_back("ids:" + this->type);
414  args.push_back(stringify(obj->id));
415  me->redis->SendCommand(NULL, args);
416 
417  args.clear();
418  args.push_back("HMSET");
419  args.push_back("hash:" + this->type + ":" + stringify(obj->id));
420 
421  typedef std::map<Anope::string, std::stringstream *> items;
422  for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
423  {
424  const Anope::string &key = it->first;
425  std::stringstream *value = it->second;
426 
427  args.push_back(key);
428  args.push_back(value->str());
429 
430  std::vector<Anope::string> args2;
431 
432  args2.push_back("SADD");
433  args2.push_back("value:" + this->type + ":" + key + ":" + value->str());
434  args2.push_back(stringify(obj->id));
435 
436  /* Add to value -> object id set */
437  me->redis->SendCommand(NULL, args2);
438  }
439 
440  ++obj->redis_ignore;
441 
442  /* Add object */
443  me->redis->SendCommand(NULL, args);
444 
445  /* Transaction end */
446  me->redis->CommitTransaction();
447 
448  delete this;
449 }
450 
452 {
453  /*
454  * [May 15 13:59:35.645839 2013] Debug: pmessage
455  * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:*
456  * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id
457  * [May 15 13:59:35.645893 2013] Debug: hset
458  */
459  if (r.multi_bulk.size() != 4)
460  return;
461 
462  size_t sz = r.multi_bulk[2]->bulk.find(':');
463  if (sz == Anope::string::npos)
464  return;
465 
466  const Anope::string &key = r.multi_bulk[2]->bulk.substr(sz + 1),
467  &op = r.multi_bulk[3]->bulk;
468 
469  sz = key.rfind(':');
470  if (sz == Anope::string::npos)
471  return;
472 
473  const Anope::string &id = key.substr(sz + 1);
474 
475  size_t sz2 = key.rfind(':', sz - 1);
476  if (sz2 == Anope::string::npos)
477  return;
478  const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1);
479 
480  Serialize::Type *s_type = Serialize::Type::Find(type);
481 
482  if (s_type == NULL)
483  return;
484 
485  uint64_t obj_id;
486  try
487  {
488  obj_id = convertTo<uint64_t>(id);
489  }
490  catch (const ConvertException &)
491  {
492  return;
493  }
494 
495  if (op == "hset" || op == "hdel")
496  {
497  Serializable *s = s_type->objects[obj_id];
498 
499  if (s && s->redis_ignore)
500  {
501  --s->redis_ignore;
502  Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it";
503  }
504  else
505  {
506  Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type;
507 
508  std::vector<Anope::string> args;
509  args.push_back("HGETALL");
510  args.push_back("hash:" + type + ":" + id);
511 
512  me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args);
513  }
514  }
515  else if (op == "del")
516  {
517  Serializable* &s = s_type->objects[obj_id];
518  if (s == NULL)
519  return;
520 
521  Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type;
522 
523  Data data;
524 
525  s->Serialize(data);
526 
527  /* Transaction start */
528  me->redis->StartTransaction();
529 
530  typedef std::map<Anope::string, std::stringstream *> items;
531  for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
532  {
533  const Anope::string &k = it->first;
534  std::stringstream *value = it->second;
535 
536  std::vector<Anope::string> args;
537  args.push_back("SREM");
538  args.push_back("value:" + type + ":" + k + ":" + value->str());
539  args.push_back(id);
540 
541  /* Delete value -> object id */
542  me->redis->SendCommand(NULL, args);
543  }
544 
545  std::vector<Anope::string> args;
546  args.push_back("SREM");
547  args.push_back("ids:" + type);
548  args.push_back(stringify(s->id));
549 
550  /* Delete object from id set */
551  me->redis->SendCommand(NULL, args);
552 
553  /* Transaction end */
554  me->redis->CommitTransaction();
555 
556  delete s;
557  s = NULL;
558  }
559 }
560 
562 {
563  Serialize::Type *st = Serialize::Type::Find(this->type);
564 
565  if (!st)
566  {
567  delete this;
568  return;
569  }
570 
571  Serializable* &obj = st->objects[this->id];
572 
573  /* Transaction start */
574  me->redis->StartTransaction();
575 
576  /* Erase old object values */
577  if (obj)
578  {
579  Data data;
580 
581  obj->Serialize(data);
582 
583  typedef std::map<Anope::string, std::stringstream *> items;
584  for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
585  {
586  const Anope::string &key = it->first;
587  std::stringstream *value = it->second;
588 
589  std::vector<Anope::string> args;
590  args.push_back("SREM");
591  args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
592  args.push_back(stringify(this->id));
593 
594  /* Delete value -> object id */
595  me->redis->SendCommand(NULL, args);
596  }
597  }
598 
599  Data data;
600 
601  for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
602  {
603  const Reply *key = r.multi_bulk[i],
604  *value = r.multi_bulk[i + 1];
605 
606  data[key->bulk] << value->bulk;
607  }
608 
609  obj = st->Unserialize(obj, data);
610  if (obj)
611  {
612  obj->id = this->id;
613  obj->UpdateCache(data);
614 
615  /* Insert new object values */
616  typedef std::map<Anope::string, std::stringstream *> items;
617  for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
618  {
619  const Anope::string &key = it->first;
620  std::stringstream *value = it->second;
621 
622  std::vector<Anope::string> args;
623  args.push_back("SADD");
624  args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
625  args.push_back(stringify(obj->id));
626 
627  /* Add to value -> object id set */
628  me->redis->SendCommand(NULL, args);
629  }
630 
631  std::vector<Anope::string> args;
632  args.push_back("SADD");
633  args.push_back("ids:" + st->GetName());
634  args.push_back(stringify(obj->id));
635 
636  /* Add to type -> id set */
637  me->redis->SendCommand(NULL, args);
638  }
639 
640  /* Transaction end */
641  me->redis->CommitTransaction();
642 
643  delete this;
644 }
645 
int64_t id
Definition: db_redis.cpp:98
void OnResult(const Reply &r) anope_override
Definition: db_redis.cpp:305
static Anope::map< ExtensibleItem< CSMiscData > * > items
Definition: cs_set_misc.cpp:18
Reference< Serializable > o
Definition: db_redis.cpp:78
IDInterface(Module *creator, Serializable *obj)
Definition: db_redis.cpp:80
Anope::string type
Definition: db_redis.cpp:97
std::string & str()
Definition: anope.h:119
size_t Hash() const anope_override
Definition: db_redis.cpp:46
void push_back(char c)
Definition: anope.h:142
void UpdateCache(Serialize::Data &)
Definition: serialize.cpp:97
std::iostream & operator[](const Anope::string &key) anope_override
Definition: db_redis.cpp:30
void OnResult(const Reply &r) anope_override
Definition: db_redis.cpp:373
void OnResult(const Reply &r) anope_override
Definition: db_redis.cpp:451
enum Redis::Reply::Type type
void OnNotify() anope_override
Definition: db_redis.cpp:164
std::map< Anope::string, std::stringstream * > data
Definition: db_redis.cpp:22
size_type rfind(const string &_str, size_type pos=npos) const
Definition: anope.h:197
Anope::string type
Definition: db_redis.cpp:87
Deleter(Module *creator, const Anope::string &t, int64_t i)
Definition: db_redis.cpp:90
uint64_t id
Definition: serialize.h:83
ModifiedObject(Module *creator, const Anope::string &t, int64_t i)
Definition: db_redis.cpp:110
std::deque< Reply * > multi_bulk
Definition: redis.h:42
ObjectLoader(Module *creator, const Anope::string &t, int64_t i)
Definition: db_redis.cpp:71
Definition: sockets.h:454
void OnSerializableDestruct(Serializable *obj) anope_override
Definition: db_redis.cpp:216
int64_t id
Definition: db_redis.cpp:68
string substr(size_type pos=0, size_type n=npos) const
Definition: anope.h:277
void OnSerializeTypeCreate(Serialize::Type *sb) anope_override
Definition: db_redis.cpp:198
SubscriptionListener(Module *creator)
Definition: db_redis.cpp:118
std::set< Anope::string > KeySet() const anope_override
Definition: db_redis.cpp:38
int64_t id
Definition: db_redis.cpp:88
TypeLoader(Module *creator, const Anope::string &t)
Definition: db_redis.cpp:60
DatabaseRedis(const Anope::string &modname, const Anope::string &creator)
Definition: db_redis.cpp:131
Definition: redis.h:10
static const size_type npos
Definition: anope.h:44
void OnSerializableUpdate(Serializable *obj) anope_override
Definition: db_redis.cpp:232
Anope::string type
Definition: db_redis.cpp:107
virtual void Serialize(Serialize::Data &data) const =0
#define anope_override
Definition: services.h:56
void InsertObject(Serializable *obj)
Definition: db_redis.cpp:138
int64_t i
Definition: redis.h:39
void OnResult(const Reply &r) anope_override
Definition: db_redis.cpp:274
ServiceReference< Provider > redis
Definition: db_redis.cpp:129
EventReturn
Definition: modules.h:129
#define MODULE_INIT(x)
Definition: modules.h:45
EventReturn OnLoadDatabase() anope_override
Definition: db_redis.cpp:182
void OnResult(const Reply &r) anope_override
Definition: db_redis.cpp:561
Anope::string stringify(const T &x)
Definition: anope.h:710
void OnSerializableConstruct(Serializable *obj) anope_override
Definition: db_redis.cpp:210
~Data()
Definition: db_redis.cpp:24
SubscriptionListener sl
Definition: db_redis.cpp:125
void OnReload(Configuration::Conf *conf) anope_override
Definition: db_redis.cpp:176
T anope_dynamic_static_cast(O ptr)
Definition: anope.h:774
std::set< Serializable * > updated_items
Definition: db_redis.cpp:126
bool IsCached(Serialize::Data &)
Definition: serialize.cpp:92
void OnResult(const Reply &r) anope_override
Definition: db_redis.cpp:327
Anope::string type
Definition: db_redis.cpp:67
Definition: logger.h:53
T Get(const Anope::string &tag)
Definition: config.h:44
void OnResult(const Reply &r) anope_override
Definition: db_redis.cpp:239
Anope::string type
Definition: db_redis.cpp:58
static DatabaseRedis * me
Definition: db_redis.cpp:16
Serialize::Type * GetSerializableType() const
Definition: serialize.h:101
Type(const Anope::string &n, unserialize_func f, Module *owner=NULL)
Anope::string bulk
Definition: redis.h:40
Updater(Module *creator, const Anope::string &t, int64_t i)
Definition: db_redis.cpp:100
unsigned short redis_ignore
Definition: serialize.h:86