Anope IRC Services  Version 2.0
m_mysql.cpp
Go to the documentation of this file.
1 /* RequiredLibraries: mysqlclient */
2 /* RequiredWindowsLibraries: libmysql */
3 
4 #include "module.h"
5 #include "modules/sql.h"
6 #define NO_CLIENT_LONG_LONG
7 #ifdef WIN32
8 # include <mysql.h>
9 #else
10 # include <mysql/mysql.h>
11 #endif
12 
13 using namespace SQL;
14 
25 class MySQLService;
26 
30 {
31  /* The connection to the database */
33  /* The interface to use once we have the result to send the data back */
35  /* The actual query */
37 
38  QueryRequest(MySQLService *s, Interface *i, const Query &q) : service(s), sqlinterface(i), query(q) { }
39 };
40 
43 {
44  /* The interface to send the data back on */
46  /* The result */
48 
49  QueryResult(Interface *i, Result &r) : sqlinterface(i), result(r) { }
50 };
51 
54 class MySQLResult : public Result
55 {
56  MYSQL_RES *res;
57 
58  public:
59  MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r) : Result(i, q, fq), res(r)
60  {
61  unsigned num_fields = res ? mysql_num_fields(res) : 0;
62 
63  /* It is not thread safe to log anything here using Log(this->owner) now :( */
64 
65  if (!num_fields)
66  return;
67 
68  for (MYSQL_ROW row; (row = mysql_fetch_row(res));)
69  {
70  MYSQL_FIELD *fields = mysql_fetch_fields(res);
71 
72  if (fields)
73  {
74  std::map<Anope::string, Anope::string> items;
75 
76  for (unsigned field_count = 0; field_count < num_fields; ++field_count)
77  {
78  Anope::string column = (fields[field_count].name ? fields[field_count].name : "");
79  Anope::string data = (row[field_count] ? row[field_count] : "");
80 
81  items[column] = data;
82  }
83 
84  this->entries.push_back(items);
85  }
86  }
87  }
88 
89  MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err) : Result(0, q, fq, err), res(NULL)
90  {
91  }
92 
94  {
95  if (this->res)
96  mysql_free_result(this->res);
97  }
98 };
99 
102 class MySQLService : public Provider
103 {
104  std::map<Anope::string, std::set<Anope::string> > active_schema;
105 
110  int port;
111 
112  MYSQL *sql;
113 
117  Anope::string Escape(const Anope::string &query);
118 
119  public:
120  /* Locked by the SQL thread when a query is pending on this database,
121  * prevents us from deleting a connection while a query is executing
122  * in the thread
123  */
125 
126  MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po);
127 
128  ~MySQLService();
129 
130  void Run(Interface *i, const Query &query) anope_override;
131 
132  Result RunQuery(const Query &query) anope_override;
133 
134  std::vector<Query> CreateTable(const Anope::string &table, const Data &data) anope_override;
135 
136  Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) anope_override;
137 
138  Query GetTables(const Anope::string &prefix) anope_override;
139 
140  void Connect();
141 
142  bool CheckConnection();
143 
144  Anope::string BuildQuery(const Query &q);
145 
146  Anope::string FromUnixtime(time_t);
147 };
148 
151 class DispatcherThread : public Thread, public Condition
152 {
153  public:
155 
156  void Run() anope_override;
157 };
158 
159 class ModuleSQL;
160 static ModuleSQL *me;
161 class ModuleSQL : public Module, public Pipe
162 {
163  /* SQL connections */
164  std::map<Anope::string, MySQLService *> MySQLServices;
165  public:
166  /* Pending query requests */
167  std::deque<QueryRequest> QueryRequests;
168  /* Pending finished requests with results */
169  std::deque<QueryResult> FinishedRequests;
170  /* The thread used to execute queries */
172 
173  ModuleSQL(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR)
174  {
175  me = this;
176 
177 
178  DThread = new DispatcherThread();
179  DThread->Start();
180  }
181 
183  {
184  for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end(); ++it)
185  delete it->second;
186  MySQLServices.clear();
187 
188  DThread->SetExitState();
189  DThread->Wakeup();
190  DThread->Join();
191  delete DThread;
192  }
193 
195  {
196  Configuration::Block *config = conf->GetModule(this);
197 
198  for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end();)
199  {
200  const Anope::string &cname = it->first;
201  MySQLService *s = it->second;
202  int i;
203 
204  ++it;
205 
206  for (i = 0; i < config->CountBlock("mysql"); ++i)
207  if (config->GetBlock("mysql", i)->Get<const Anope::string>("name", "mysql/main") == cname)
208  break;
209 
210  if (i == config->CountBlock("mysql"))
211  {
212  Log(LOG_NORMAL, "mysql") << "MySQL: Removing server connection " << cname;
213 
214  delete s;
215  this->MySQLServices.erase(cname);
216  }
217  }
218 
219  for (int i = 0; i < config->CountBlock("mysql"); ++i)
220  {
221  Configuration::Block *block = config->GetBlock("mysql", i);
222  const Anope::string &connname = block->Get<const Anope::string>("name", "mysql/main");
223 
224  if (this->MySQLServices.find(connname) == this->MySQLServices.end())
225  {
226  const Anope::string &database = block->Get<const Anope::string>("database", "anope");
227  const Anope::string &server = block->Get<const Anope::string>("server", "127.0.0.1");
228  const Anope::string &user = block->Get<const Anope::string>("username", "anope");
229  const Anope::string &password = block->Get<const Anope::string>("password");
230  int port = block->Get<int>("port", "3306");
231 
232  try
233  {
234  MySQLService *ss = new MySQLService(this, connname, database, server, user, password, port);
235  this->MySQLServices.insert(std::make_pair(connname, ss));
236 
237  Log(LOG_NORMAL, "mysql") << "MySQL: Successfully connected to server " << connname << " (" << server << ")";
238  }
239  catch (const SQL::Exception &ex)
240  {
241  Log(LOG_NORMAL, "mysql") << "MySQL: " << ex.GetReason();
242  }
243  }
244  }
245  }
246 
247  void OnModuleUnload(User *, Module *m) anope_override
248  {
249  this->DThread->Lock();
250 
251  for (unsigned i = this->QueryRequests.size(); i > 0; --i)
252  {
253  QueryRequest &r = this->QueryRequests[i - 1];
254 
255  if (r.sqlinterface && r.sqlinterface->owner == m)
256  {
257  if (i == 1)
258  {
259  r.service->Lock.Lock();
260  r.service->Lock.Unlock();
261  }
262 
263  this->QueryRequests.erase(this->QueryRequests.begin() + i - 1);
264  }
265  }
266 
267  this->DThread->Unlock();
268 
269  this->OnNotify();
270  }
271 
273  {
274  this->DThread->Lock();
275  std::deque<QueryResult> finishedRequests = this->FinishedRequests;
276  this->FinishedRequests.clear();
277  this->DThread->Unlock();
278 
279  for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it)
280  {
281  const QueryResult &qr = *it;
282 
283  if (!qr.sqlinterface)
284  throw SQL::Exception("NULL qr.sqlinterface in MySQLPipe::OnNotify() ?");
285 
286  if (qr.result.GetError().empty())
287  qr.sqlinterface->OnResult(qr.result);
288  else
289  qr.sqlinterface->OnError(qr.result);
290  }
291  }
292 };
293 
294 MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po)
295 : Provider(o, n), database(d), server(s), user(u), password(p), port(po), sql(NULL)
296 {
297  Connect();
298 }
299 
301 {
302  me->DThread->Lock();
303  this->Lock.Lock();
304  mysql_close(this->sql);
305  this->sql = NULL;
306 
307  for (unsigned i = me->QueryRequests.size(); i > 0; --i)
308  {
309  QueryRequest &r = me->QueryRequests[i - 1];
310 
311  if (r.service == this)
312  {
313  if (r.sqlinterface)
314  r.sqlinterface->OnError(Result(0, r.query, "SQL Interface is going away"));
315  me->QueryRequests.erase(me->QueryRequests.begin() + i - 1);
316  }
317  }
318  this->Lock.Unlock();
319  me->DThread->Unlock();
320 }
321 
322 void MySQLService::Run(Interface *i, const Query &query)
323 {
324  me->DThread->Lock();
325  me->QueryRequests.push_back(QueryRequest(this, i, query));
326  me->DThread->Unlock();
327  me->DThread->Wakeup();
328 }
329 
331 {
332  this->Lock.Lock();
333 
334  Anope::string real_query = this->BuildQuery(query);
335 
336  if (this->CheckConnection() && !mysql_real_query(this->sql, real_query.c_str(), real_query.length()))
337  {
338  MYSQL_RES *res = mysql_store_result(this->sql);
339  unsigned int id = mysql_insert_id(this->sql);
340 
341  /* because we enabled CLIENT_MULTI_RESULTS in our options
342  * a multiple statement or a procedure call can return
343  * multiple result sets.
344  * we must process them all before the next query.
345  */
346 
347  while (!mysql_next_result(this->sql))
348  mysql_free_result(mysql_store_result(this->sql));
349 
350  this->Lock.Unlock();
351  return MySQLResult(id, query, real_query, res);
352  }
353  else
354  {
355  Anope::string error = mysql_error(this->sql);
356  this->Lock.Unlock();
357  return MySQLResult(query, real_query, error);
358  }
359 }
360 
361 std::vector<Query> MySQLService::CreateTable(const Anope::string &table, const Data &data)
362 {
363  std::vector<Query> queries;
364  std::set<Anope::string> &known_cols = this->active_schema[table];
365 
366  if (known_cols.empty())
367  {
368  Log(LOG_DEBUG) << "m_mysql: Fetching columns for " << table;
369 
370  Result columns = this->RunQuery("SHOW COLUMNS FROM `" + table + "`");
371  for (int i = 0; i < columns.Rows(); ++i)
372  {
373  const Anope::string &column = columns.Get(i, "Field");
374 
375  Log(LOG_DEBUG) << "m_mysql: Column #" << i << " for " << table << ": " << column;
376  known_cols.insert(column);
377  }
378  }
379 
380  if (known_cols.empty())
381  {
382  Anope::string query_text = "CREATE TABLE `" + table + "` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT,"
383  " `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP";
384  for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
385  {
386  known_cols.insert(it->first);
387 
388  query_text += ", `" + it->first + "` ";
389  if (data.GetType(it->first) == Serialize::Data::DT_INT)
390  query_text += "int(11)";
391  else
392  query_text += "text";
393  }
394  query_text += ", PRIMARY KEY (`id`), KEY `timestamp_idx` (`timestamp`))";
395  queries.push_back(query_text);
396  }
397  else
398  for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
399  {
400  if (known_cols.count(it->first) > 0)
401  continue;
402 
403  known_cols.insert(it->first);
404 
405  Anope::string query_text = "ALTER TABLE `" + table + "` ADD `" + it->first + "` ";
406  if (data.GetType(it->first) == Serialize::Data::DT_INT)
407  query_text += "int(11)";
408  else
409  query_text += "text";
410 
411  queries.push_back(query_text);
412  }
413 
414  return queries;
415 }
416 
417 Query MySQLService::BuildInsert(const Anope::string &table, unsigned int id, Data &data)
418 {
419  /* Empty columns not present in the data set */
420  const std::set<Anope::string> &known_cols = this->active_schema[table];
421  for (std::set<Anope::string>::iterator it = known_cols.begin(), it_end = known_cols.end(); it != it_end; ++it)
422  if (*it != "id" && *it != "timestamp" && data.data.count(*it) == 0)
423  data[*it] << "";
424 
425  Anope::string query_text = "INSERT INTO `" + table + "` (`id`";
426  for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
427  query_text += ",`" + it->first + "`";
428  query_text += ") VALUES (" + stringify(id);
429  for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
430  query_text += ",@" + it->first + "@";
431  query_text += ") ON DUPLICATE KEY UPDATE ";
432  for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
433  query_text += "`" + it->first + "`=VALUES(`" + it->first + "`),";
434  query_text.erase(query_text.end() - 1);
435 
436  Query query(query_text);
437  for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
438  {
439  Anope::string buf;
440  *it->second >> buf;
441  query.SetValue(it->first, buf);
442  }
443 
444  return query;
445 }
446 
448 {
449  return Query("SHOW TABLES LIKE '" + prefix + "%';");
450 }
451 
453 {
454  this->sql = mysql_init(this->sql);
455 
456  const unsigned int timeout = 1;
457  mysql_options(this->sql, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout));
458 
459  bool connect = mysql_real_connect(this->sql, this->server.c_str(), this->user.c_str(), this->password.c_str(), this->database.c_str(), this->port, NULL, CLIENT_MULTI_RESULTS);
460 
461  if (!connect)
462  throw SQL::Exception("Unable to connect to MySQL service " + this->name + ": " + mysql_error(this->sql));
463 
464  Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->server << ":" << this->port;
465 }
466 
467 
469 {
470  if (!this->sql || mysql_ping(this->sql))
471  {
472  try
473  {
474  this->Connect();
475  }
476  catch (const SQL::Exception &)
477  {
478  return false;
479  }
480  }
481 
482  return true;
483 }
484 
486 {
487  std::vector<char> buffer(query.length() * 2 + 1);
488  mysql_real_escape_string(this->sql, &buffer[0], query.c_str(), query.length());
489  return &buffer[0];
490 }
491 
493 {
494  Anope::string real_query = q.query;
495 
496  for (std::map<Anope::string, QueryData>::const_iterator it = q.parameters.begin(), it_end = q.parameters.end(); it != it_end; ++it)
497  real_query = real_query.replace_all_cs("@" + it->first + "@", (it->second.escape ? ("'" + this->Escape(it->second.data) + "'") : it->second.data));
498 
499  return real_query;
500 }
501 
503 {
504  return "FROM_UNIXTIME(" + stringify(t) + ")";
505 }
506 
508 {
509  this->Lock();
510 
511  while (!this->GetExitState())
512  {
513  if (!me->QueryRequests.empty())
514  {
515  QueryRequest &r = me->QueryRequests.front();
516  this->Unlock();
517 
518  Result sresult = r.service->RunQuery(r.query);
519 
520  this->Lock();
521  if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query)
522  {
523  if (r.sqlinterface)
524  me->FinishedRequests.push_back(QueryResult(r.sqlinterface, sresult));
525  me->QueryRequests.pop_front();
526  }
527  }
528  else
529  {
530  if (!me->FinishedRequests.empty())
531  me->Notify();
532  this->Wait();
533  }
534  }
535 
536  this->Unlock();
537 }
538 
540 
std::vector< Query > CreateTable(const Anope::string &table, const Data &data) anope_override
Definition: m_mysql.cpp:361
Module * owner
Definition: sql.h:186
void Join()
void Run() anope_override
Definition: m_mysql.cpp:507
static Anope::map< ExtensibleItem< CSMiscData > * > items
Definition: cs_set_misc.cpp:18
int CountBlock(const Anope::string &name)
Definition: config.cpp:35
Map data
Definition: sql.h:15
virtual void OnError(const Result &r)=0
Anope::string server
Definition: m_mysql.cpp:107
std::deque< QueryRequest > QueryRequests
Definition: m_mysql.cpp:167
Anope::string Escape(const Anope::string &src)
Definition: httpd.h:206
Anope::string FromUnixtime(time_t)
Definition: m_mysql.cpp:502
MYSQL * sql
Definition: m_mysql.cpp:112
Interface * sqlinterface
Definition: m_mysql.cpp:34
Anope::string query
Definition: sql.h:98
Anope::string Escape(const Anope::string &query)
Definition: m_mysql.cpp:485
void push_back(char c)
Definition: anope.h:142
bool CheckConnection()
Definition: m_mysql.cpp:468
bool GetExitState() const
std::map< Anope::string, QueryData > parameters
Definition: sql.h:99
Result RunQuery(const Query &query) anope_override
Definition: m_mysql.cpp:330
void OnNotify() anope_override
Definition: m_mysql.cpp:272
const Anope::string & GetError() const
Definition: sql.h:153
Definition: users.h:34
void Connect()
Definition: m_mysql.cpp:452
std::deque< QueryResult > FinishedRequests
Definition: m_mysql.cpp:169
void Unlock()
static ModuleSQL * me
Definition: m_mysql.cpp:159
void Start()
void SetValue(const Anope::string &key, const T &value, bool escape=true)
Definition: sql.h:121
Interface * sqlinterface
Definition: m_mysql.cpp:45
Query query
Definition: m_mysql.cpp:36
QueryResult(Interface *i, Result &r)
Definition: m_mysql.cpp:49
Definition: sockets.h:454
iterator erase(const iterator &i)
Definition: anope.h:155
Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) anope_override
Definition: m_mysql.cpp:417
void SetExitState()
Anope::string database
Definition: m_mysql.cpp:106
void Wakeup()
DispatcherThread * DThread
Definition: m_mysql.cpp:171
size_type length() const
Definition: anope.h:131
Block * GetBlock(const Anope::string &name, int num=0)
Definition: config.cpp:43
Definition: sql.h:96
MySQLService * service
Definition: m_mysql.cpp:32
void Notify()
Definition: pipeengine.cpp:77
virtual void OnResult(const Result &r)=0
Mutex Lock
Definition: m_mysql.cpp:124
string replace_all_cs(const string &_orig, const string &_repl) const
Definition: anope.h:229
Query GetTables(const Anope::string &prefix) anope_override
Definition: m_mysql.cpp:447
~MySQLResult()
Definition: m_mysql.cpp:93
MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po)
Definition: m_mysql.cpp:294
database_map database
Definition: cs_seen.cpp:24
#define anope_override
Definition: services.h:56
Definition: sql.h:11
bool empty() const
Definition: anope.h:126
Anope::string user
Definition: m_mysql.cpp:108
Result result
Definition: m_mysql.cpp:47
Type GetType(const Anope::string &key) const anope_override
Definition: sql.h:68
~ModuleSQL()
Definition: m_mysql.cpp:182
#define MODULE_INIT(x)
Definition: modules.h:45
static Timer * timeout
Definition: os_defcon.cpp:106
void Lock()
Definition: sql.h:8
Anope::string BuildQuery(const Query &q)
Definition: m_mysql.cpp:492
std::map< Anope::string, std::set< Anope::string > > active_schema
Definition: m_mysql.cpp:104
QueryRequest(MySQLService *s, Interface *i, const Query &q)
Definition: m_mysql.cpp:38
Anope::string stringify(const T &x)
Definition: anope.h:710
Anope::string name
Definition: service.h:88
iterator end()
Definition: anope.h:284
virtual const Anope::string & GetReason() const
Definition: anope.h:672
const char * c_str() const
Definition: anope.h:117
MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err)
Definition: m_mysql.cpp:89
Definition: logger.h:53
Anope::string password
Definition: m_mysql.cpp:109
T Get(const Anope::string &tag)
Definition: config.h:44
MYSQL_RES * res
Definition: m_mysql.cpp:56
std::map< Anope::string, MySQLService * > MySQLServices
Definition: m_mysql.cpp:164
int Rows() const
Definition: sql.h:155
MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r)
Definition: m_mysql.cpp:59
ModuleSQL(const Anope::string &modname, const Anope::string &creator)
Definition: m_mysql.cpp:173
Definition: modules.h:163
void OnReload(Configuration::Conf *conf) anope_override
Definition: m_mysql.cpp:194
const Anope::string Get(size_t index, const Anope::string &col) const
Definition: sql.h:169
void Run(Interface *i, const Query &query) anope_override
Definition: m_mysql.cpp:322
void OnModuleUnload(User *, Module *m) anope_override
Definition: m_mysql.cpp:247