Anope IRC Services  Version 2.0
m_redis.cpp
Go to the documentation of this file.
1 /*
2  *
3  * (C) 2003-2014 Anope Team
4  * Contact us at team@anope.org
5  *
6  * Please read COPYING and README for further details.
7  *
8  *
9  */
10 
11 #include "module.h"
12 #include "modules/redis.h"
13 
14 using namespace Redis;
15 
16 class MyRedisService;
17 
19 {
20  size_t ParseReply(Reply &r, const char *buf, size_t l);
21  public:
23  std::deque<Interface *> interfaces;
24  std::map<Anope::string, Interface *> subinterfaces;
25 
26  RedisSocket(MyRedisService *pro, bool v6) : Socket(-1, v6), provider(pro) { }
27 
28  ~RedisSocket();
29 
30  void OnConnect() anope_override;
31  void OnError(const Anope::string &error) anope_override;
32 
33  bool Read(const char *buffer, size_t l) anope_override;
34 };
35 
36 class Transaction : public Interface
37 {
38  public:
39  std::deque<Interface *> interfaces;
40 
41  Transaction(Module *creator) : Interface(creator) { }
42 
44  {
45  for (unsigned i = 0; i < interfaces.size(); ++i)
46  {
47  Interface *inter = interfaces[i];
48 
49  if (!inter)
50  continue;
51 
52  inter->OnError("Interface going away");
53  }
54  }
55 
56  void OnResult(const Reply &r) anope_override
57  {
58  /* This is a multi bulk reply of the results of the queued commands
59  * in this transaction
60  */
61 
62  Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results";
63 
64  for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
65  {
66  const Reply *reply = r.multi_bulk[i];
67 
68  if (interfaces.empty())
69  break;
70 
71  Interface *inter = interfaces.front();
72  interfaces.pop_front();
73 
74  if (inter)
75  inter->OnResult(*reply);
76  }
77  }
78 };
79 
80 class MyRedisService : public Provider
81 {
82  public:
84  int port;
85  unsigned db;
86 
87  RedisSocket *sock, *sub;
88 
91 
92  MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), sock(NULL), sub(NULL),
93  ti(c), in_transaction(false)
94  {
95  sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
96  sock->Connect(host, port);
97 
98  sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
99  sub->Connect(host, port);
100  }
101 
103  {
104  if (sock)
105  {
106  sock->flags[SF_DEAD] = true;
107  sock->provider = NULL;
108  }
109 
110  if (sub)
111  {
112  sub->flags[SF_DEAD] = true;
113  sub->provider = NULL;
114  }
115  }
116 
117  private:
118  inline void Pack(std::vector<char> &buffer, const char *buf, size_t sz = 0)
119  {
120  if (!sz)
121  sz = strlen(buf);
122 
123  size_t old_size = buffer.size();
124  buffer.resize(old_size + sz);
125  std::copy(buf, buf + sz, buffer.begin() + old_size);
126  }
127 
128  void Send(RedisSocket *s, Interface *i, const std::vector<std::pair<const char *, size_t> > &args)
129  {
130  std::vector<char> buffer;
131 
132  Pack(buffer, "*");
133  Pack(buffer, stringify(args.size()).c_str());
134  Pack(buffer, "\r\n");
135 
136  for (unsigned j = 0; j < args.size(); ++j)
137  {
138  const std::pair<const char *, size_t> &pair = args[j];
139 
140  Pack(buffer, "$");
141  Pack(buffer, stringify(pair.second).c_str());
142  Pack(buffer, "\r\n");
143 
144  Pack(buffer, pair.first, pair.second);
145  Pack(buffer, "\r\n");
146  }
147 
148  if (buffer.empty())
149  return;
150 
151  s->Write(&buffer[0], buffer.size());
152  if (in_transaction)
153  {
154  ti.interfaces.push_back(i);
155  s->interfaces.push_back(NULL); // For the +Queued response
156  }
157  else
158  s->interfaces.push_back(i);
159  }
160 
161  public:
162  void SendCommand(RedisSocket *s, Interface *i, const std::vector<Anope::string> &cmds)
163  {
164  std::vector<std::pair<const char *, size_t> > args;
165  for (unsigned j = 0; j < cmds.size(); ++j)
166  args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length()));
167  this->Send(s, i, args);
168  }
169 
171  {
172  std::vector<Anope::string> args;
173  spacesepstream(str).GetTokens(args);
174  this->SendCommand(s, i, args);
175  }
176 
177  void Send(Interface *i, const std::vector<std::pair<const char *, size_t> > &args)
178  {
179  if (!sock)
180  {
181  sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
182  sock->Connect(host, port);
183  }
184 
185  this->Send(sock, i, args);
186  }
187 
188  void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) anope_override
189  {
190  std::vector<std::pair<const char *, size_t> > args;
191  for (unsigned j = 0; j < cmds.size(); ++j)
192  args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length()));
193  this->Send(i, args);
194  }
195 
197  {
198  std::vector<Anope::string> args;
199  spacesepstream(str).GetTokens(args);
200  this->SendCommand(i, args);
201  }
202 
203  public:
205  {
206  this->sock->ProcessWrite();
207  this->sock->SetBlocking(true);
208  this->sock->ProcessRead();
209  this->sock->SetBlocking(false);
210  return !this->sock->interfaces.empty();
211  }
212 
214  {
215  if (sub == NULL)
216  {
217  sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
218  sub->Connect(host, port);
219  }
220 
221  std::vector<Anope::string> args;
222  args.push_back("PSUBSCRIBE");
223  args.push_back(pattern);
224  this->SendCommand(sub, NULL, args);
225 
226  sub->subinterfaces[pattern] = i;
227  }
228 
230  {
231  if (sub)
232  sub->subinterfaces.erase(pattern);
233  }
234 
236  {
237  if (in_transaction)
238  throw CoreException();
239 
240  this->SendCommand(NULL, "MULTI");
241  in_transaction = true;
242  }
243 
245  {
246  /* The result of the transaction comes back to the reply of EXEC as a multi bulk.
247  * The reply to the individual commands that make up the transaction when executed
248  * is a simple +QUEUED
249  */
250  in_transaction = false;
251  this->SendCommand(&this->ti, "EXEC");
252  }
253 };
254 
256 {
257  if (provider)
258  {
259  if (provider->sock == this)
260  provider->sock = NULL;
261  else if (provider->sub == this)
262  provider->sub = NULL;
263  }
264 
265  for (unsigned i = 0; i < interfaces.size(); ++i)
266  {
267  Interface *inter = interfaces[i];
268 
269  if (!inter)
270  continue;
271 
272  inter->OnError("Interface going away");
273  }
274 }
275 
277 {
278  Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : "");
279 
280  this->provider->SendCommand(NULL, "CLIENT SETNAME Anope");
281  this->provider->SendCommand(NULL, "SELECT " + stringify(provider->db));
282 
283  if (this != this->provider->sub)
284  {
285  this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA");
286  }
287 }
288 
290 {
291  Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error;
292 }
293 
294 size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l)
295 {
296  size_t used = 0;
297 
298  if (!l)
299  return used;
300 
301  if (r.type == Reply::MULTI_BULK)
302  goto multi_bulk_cont;
303 
304  switch (*buffer)
305  {
306  case '+':
307  {
308  Anope::string reason(buffer, 1, l - 1);
309  size_t nl = reason.find("\r\n");
310  Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl);
311  if (nl != Anope::string::npos)
312  {
313  r.type = Reply::OK;
314  used = 1 + nl + 2;
315  }
316  break;
317  }
318  case '-':
319  {
320  Anope::string reason(buffer, 1, l - 1);
321  size_t nl = reason.find("\r\n");
322  Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl);
323  if (nl != Anope::string::npos)
324  {
325  r.type = Reply::NOT_OK;
326  used = 1 + nl + 2;
327  }
328  break;
329  }
330  case ':':
331  {
332  Anope::string ibuf(buffer, 1, l - 1);
333  size_t nl = ibuf.find("\r\n");
334  if (nl != Anope::string::npos)
335  {
336  try
337  {
338  r.i = convertTo<int64_t>(ibuf.substr(0, nl));
339  }
340  catch (const ConvertException &) { }
341 
342  r.type = Reply::INT;
343  used = 1 + nl + 2;
344  }
345  break;
346  }
347  case '$':
348  {
349  Anope::string reply(buffer + 1, l - 1);
350  /* This assumes one bulk can always fit in our recv buffer */
351  size_t nl = reply.find("\r\n");
352  if (nl != Anope::string::npos)
353  {
354  int len;
355  try
356  {
357  len = convertTo<int>(reply.substr(0, nl));
358  if (len >= 0)
359  {
360  if (1 + nl + 2 + len + 2 <= l)
361  {
362  used = 1 + nl + 2 + len + 2;
363  r.bulk = reply.substr(nl + 2, len);
364  r.type = Reply::BULK;
365  }
366  }
367  else
368  {
369  used = 1 + nl + 2 + 2;
370  r.type = Reply::BULK;
371  }
372  }
373  catch (const ConvertException &) { }
374  }
375  break;
376  }
377  multi_bulk_cont:
378  case '*':
379  {
380  if (r.type != Reply::MULTI_BULK)
381  {
382  Anope::string reply(buffer + 1, l - 1);
383  size_t nl = reply.find("\r\n");
384  if (nl != Anope::string::npos)
385  {
387  try
388  {
389  r.multi_bulk_size = convertTo<int>(reply.substr(0, nl));
390  }
391  catch (const ConvertException &) { }
392 
393  used = 1 + nl + 2;
394  }
395  else
396  break;
397  }
398  else if (r.multi_bulk_size >= 0 && r.multi_bulk.size() == static_cast<unsigned>(r.multi_bulk_size))
399  {
400  /* This multi bulk is already complete, so check the sub bulks */
401  for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
402  if (r.multi_bulk[i]->type == Reply::MULTI_BULK)
403  ParseReply(*r.multi_bulk[i], buffer + used, l - used);
404  break;
405  }
406 
407  for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i)
408  {
409  Reply *reply = new Reply();
410  size_t u = ParseReply(*reply, buffer + used, l - used);
411  if (!u)
412  {
413  Log(LOG_DEBUG) << "redis: ran out of data to parse";
414  delete reply;
415  break;
416  }
417  r.multi_bulk.push_back(reply);
418  used += u;
419  }
420  break;
421  }
422  default:
423  Log(LOG_DEBUG) << "redis: unknown reply " << *buffer;
424  }
425 
426  return used;
427 }
428 
429 bool RedisSocket::Read(const char *buffer, size_t l)
430 {
431  static std::vector<char> save;
432  std::vector<char> copy;
433 
434  if (!save.empty())
435  {
436  std::copy(buffer, buffer + l, std::back_inserter(save));
437 
438  copy = save;
439 
440  buffer = &copy[0];
441  l = copy.size();
442  }
443 
444  while (l)
445  {
446  static Reply r;
447 
448  size_t used = this->ParseReply(r, buffer, l);
449  if (!used)
450  {
451  Log(LOG_DEBUG) << "redis: used == 0 ?";
452  r.Clear();
453  break;
454  }
455  else if (used > l)
456  {
457  Log(LOG_DEBUG) << "redis: used > l ?";
458  r.Clear();
459  break;
460  }
461 
462  /* Full result is not here yet */
463  if (r.type == Reply::MULTI_BULK && static_cast<unsigned>(r.multi_bulk_size) != r.multi_bulk.size())
464  {
465  buffer += used;
466  l -= used;
467  break;
468  }
469 
470  if (this == provider->sub)
471  {
472  if (r.multi_bulk.size() == 4)
473  {
474  /* pmessage
475  * pattern subscribed to
476  * __keyevent@0__:set
477  * key
478  */
479  std::map<Anope::string, Interface *>::iterator it = this->subinterfaces.find(r.multi_bulk[1]->bulk);
480  if (it != this->subinterfaces.end())
481  it->second->OnResult(r);
482  }
483  }
484  else
485  {
486  if (this->interfaces.empty())
487  {
488  Log(LOG_DEBUG) << "redis: no interfaces?";
489  }
490  else
491  {
492  Interface *i = this->interfaces.front();
493  this->interfaces.pop_front();
494 
495  if (i)
496  {
497  if (r.type != Reply::NOT_OK)
498  i->OnResult(r);
499  else
500  i->OnError(r.bulk);
501  }
502  }
503  }
504 
505  buffer += used;
506  l -= used;
507 
508  r.Clear();
509  }
510 
511  if (l)
512  {
513  save.resize(l);
514  std::copy(buffer, buffer + l, save.begin());
515  }
516  else
517  std::vector<char>().swap(save);
518 
519  return true;
520 }
521 
522 
523 class ModuleRedis : public Module
524 {
525  std::map<Anope::string, MyRedisService *> services;
526 
527  public:
528  ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR)
529  {
530  }
531 
533  {
534  for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it)
535  {
536  MyRedisService *p = it->second;
537 
538  delete p->sock;
539  p->sock = NULL;
540  delete p->sub;
541  p->sub = NULL;
542 
543  delete p;
544  }
545  }
546 
548  {
549  Configuration::Block *block = conf->GetModule(this);
550  std::vector<Anope::string> new_services;
551 
552  for (int i = 0; i < block->CountBlock("redis"); ++i)
553  {
554  Configuration::Block *redis = block->GetBlock("redis", i);
555 
556  const Anope::string &n = redis->Get<const Anope::string>("name"),
557  &ip = redis->Get<const Anope::string>("ip");
558  int port = redis->Get<int>("port");
559  unsigned db = redis->Get<unsigned>("db");
560 
561  delete services[n];
562  services[n] = new MyRedisService(this, n, ip, port, db);
563  new_services.push_back(n);
564  }
565 
566  for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end();)
567  {
568  Provider *p = it->second;
569  ++it;
570 
571  if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end())
572  delete it->second;
573  }
574  }
575 
577  {
578  for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it)
579  {
580  MyRedisService *p = it->second;
581 
582  if (p->sock)
583  for (unsigned i = p->sock->interfaces.size(); i > 0; --i)
584  {
585  Interface *inter = p->sock->interfaces[i - 1];
586 
587  if (inter && inter->owner == m)
588  {
589  inter->OnError(m->name + " being unloaded");
590  p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1);
591  }
592  }
593 
594  if (p->sub)
595  for (unsigned i = p->sub->interfaces.size(); i > 0; --i)
596  {
597  Interface *inter = p->sub->interfaces[i - 1];
598 
599  if (inter && inter->owner == m)
600  {
601  inter->OnError(m->name + " being unloaded");
602  p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1);
603  }
604  }
605 
606  for (unsigned i = p->ti.interfaces.size(); i > 0; --i)
607  {
608  Interface *inter = p->ti.interfaces[i - 1];
609 
610  if (inter && inter->owner == m)
611  {
612  inter->OnError(m->name + " being unloaded");
613  p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1);
614  }
615  }
616  }
617  }
618 };
619 
void OnResult(const Reply &r) anope_override
Definition: m_redis.cpp:56
void SendCommand(Interface *i, const std::vector< Anope::string > &cmds) anope_override
Definition: m_redis.cpp:188
RedisSocket * sub
Definition: m_redis.cpp:87
int CountBlock(const Anope::string &name)
Definition: config.cpp:35
std::bitset< SF_SIZE > flags
Definition: sockets.h:203
virtual void OnError(const Anope::string &error)
Definition: redis.h:54
virtual void Write(const char *buffer, size_t l)
std::deque< Interface * > interfaces
Definition: m_redis.cpp:23
void OnError(const Anope::string &error) anope_override
Definition: m_redis.cpp:289
void SendCommand(RedisSocket *s, Interface *i, const std::vector< Anope::string > &cmds)
Definition: m_redis.cpp:162
void Send(Interface *i, const std::vector< std::pair< const char *, size_t > > &args)
Definition: m_redis.cpp:177
~Transaction()
Definition: m_redis.cpp:43
void OnModuleUnload(User *, Module *m) anope_override
Definition: m_redis.cpp:576
void GetTokens(T &token)
Definition: anope.h:587
void Send(RedisSocket *s, Interface *i, const std::vector< std::pair< const char *, size_t > > &args)
Definition: m_redis.cpp:128
size_t ParseReply(Reply &r, const char *buf, size_t l)
Definition: m_redis.cpp:294
Definition: users.h:34
enum Redis::Reply::Type type
void Clear()
Definition: redis.h:28
Transaction(Module *creator)
Definition: m_redis.cpp:41
std::deque< Interface * > interfaces
Definition: m_redis.cpp:39
bool SetBlocking(bool state)
Definition: sockets.cpp:484
Anope::string host
Definition: m_redis.cpp:83
std::deque< Reply * > multi_bulk
Definition: redis.h:42
MyRedisService * provider
Definition: m_redis.cpp:22
void Pack(std::vector< char > &buffer, const char *buf, size_t sz=0)
Definition: m_redis.cpp:118
string substr(size_type pos=0, size_type n=npos) const
Definition: anope.h:277
bool ProcessRead() anope_override
void Subscribe(Interface *i, const Anope::string &pattern) anope_override
Definition: m_redis.cpp:213
void Connect(const Anope::string &TargetHost, int Port)
bool ProcessWrite() anope_override
void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str)
Definition: m_redis.cpp:170
Block * GetBlock(const Anope::string &name, int num=0)
Definition: config.cpp:43
virtual void OnResult(const Reply &r)=0
Definition: redis.h:10
static const size_type npos
Definition: anope.h:44
MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d)
Definition: m_redis.cpp:92
void Unsubscribe(const Anope::string &pattern) anope_override
Definition: m_redis.cpp:229
int multi_bulk_size
Definition: redis.h:41
bool Read(const char *buffer, size_t l) anope_override
Definition: m_redis.cpp:429
CoreExport bool Send(User *from, NickCore *to, BotInfo *service, const Anope::string &subject, const Anope::string &message)
Definition: mail.cpp:54
std::map< Anope::string, Interface * > subinterfaces
Definition: m_redis.cpp:24
void SendCommand(Interface *i, const Anope::string &str) anope_override
Definition: m_redis.cpp:196
#define anope_override
Definition: services.h:56
int64_t i
Definition: redis.h:39
Transaction ti
Definition: m_redis.cpp:89
#define MODULE_INIT(x)
Definition: modules.h:45
Anope::string stringify(const T &x)
Definition: anope.h:710
bool BlockAndProcess() anope_override
Definition: m_redis.cpp:204
Anope::string name
Definition: service.h:88
Module * owner
Definition: redis.h:48
ModuleRedis(const Anope::string &modname, const Anope::string &creator)
Definition: m_redis.cpp:528
void OnReload(Configuration::Conf *conf) anope_override
Definition: m_redis.cpp:547
const char * c_str() const
Definition: anope.h:117
Definition: logger.h:53
T Get(const Anope::string &tag)
Definition: config.h:44
RedisSocket * sock
Definition: m_redis.cpp:87
void CommitTransaction() anope_override
Definition: m_redis.cpp:244
void OnConnect() anope_override
Definition: m_redis.cpp:276
size_type find(const string &_str, size_type pos=0) const
Definition: anope.h:192
RedisSocket(MyRedisService *pro, bool v6)
Definition: m_redis.cpp:26
unsigned db
Definition: m_redis.cpp:85
void StartTransaction() anope_override
Definition: m_redis.cpp:235
Definition: anope.h:20
bool in_transaction
Definition: m_redis.cpp:90
Anope::string bulk
Definition: redis.h:40
Definition: modules.h:163
std::map< Anope::string, MyRedisService * > services
Definition: m_redis.cpp:525