Initial connection pool implementation

This commit is contained in:
Georg Richter
2019-11-24 12:45:32 +01:00
parent 0b66a9f287
commit 628fc39cd0
11 changed files with 790 additions and 31 deletions

View File

@ -284,3 +284,8 @@ PyDoc_STRVAR(
"Returns the alphanumeric version of connected database. Tthe numeric version\n" "Returns the alphanumeric version of connected database. Tthe numeric version\n"
"can be obtained with server_version property." "can be obtained with server_version property."
); );
PyDoc_STRVAR(
connection_exception_OperationalError__doc__,
"(read only)\n\n"
);

77
include/docs/pool.h Normal file
View File

@ -0,0 +1,77 @@
/************************************************************************************
Copyright (C) 2019 Georg Richter and MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not see <http://www.gnu.org/licenses>
or write to the Free Software Foundation, Inc.,
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
*************************************************************************************/
PyDoc_STRVAR(
pool_pool_name__doc__,
"(read only)\n\n"
"Returns the name of the connection pool"
);
PyDoc_STRVAR(
connection_pool__doc__,
"Class defining a pool of database connections"
);
PyDoc_STRVAR(
pool_get_connection__doc__,
"get_connection()\n"
"--\n"
"\n"
"Returns a connection from the connection pool or raises a PoolError if\n"
"a connection is not available"
);
PyDoc_STRVAR(
pool_add_connection__doc__,
"add_connection(connection)\n"
"--\n"
"\n"
"Parameter:\n"
"connection: mariadb connection object\n\n"
"Adds a connection to the connection pool. In case the pool doesn't\n"
"have a free slot or is not configured a PoolError exception will be raised."
);
PyDoc_STRVAR(
pool_set_config__doc__,
"set_config(configuration)\n"
"--\n"
"\n"
"Parameter:\n"
"configuration: dictionary\n\n"
"Sets the connection configuration for the connection pool."
);
PyDoc_STRVAR(
pool_pool_size__doc__,
"(read only)\n\n"
"Returns the size of the connection pool"
);
PyDoc_STRVAR(
pool_max_size__doc__,
"(read only)\n\n"
"Returns the maximum allowed size of the connection pool"
);
PyDoc_STRVAR(
pool_pool_reset_connection__doc__,
"(read/write)\n\n"
"If set to true, the connection will be resetted on both client and server side\n"
"after .close() method was called"
);

View File

@ -40,6 +40,7 @@
#endif /* _WIN32 */ #endif /* _WIN32 */
#define MAX_TPC_XID_SIZE 65 #define MAX_TPC_XID_SIZE 65
#define POOL_DEFAULT_SIZE 5
/* Magic constant for checking dynamic columns */ /* Magic constant for checking dynamic columns */
#define PYTHON_DYNCOL_VALUE 0xA378BD8E #define PYTHON_DYNCOL_VALUE 0xA378BD8E
@ -97,6 +98,8 @@ typedef struct st_parser {
MrdbString *keys; MrdbString *keys;
} MrdbParser; } MrdbParser;
struct mrdb_pool;
/* PEP-249: Connection object */ /* PEP-249: Connection object */
typedef struct { typedef struct {
PyObject_HEAD PyObject_HEAD
@ -114,20 +117,24 @@ typedef struct {
int port; int port;
PyObject *charset; PyObject *charset;
PyObject *collation; PyObject *collation;
char *poolname; struct mrdb_pool *pool;
uint8_t inuse;
uint8_t status; uint8_t status;
} MrdbConnection; } MrdbConnection;
typedef struct { typedef struct mrdb_pool{
PyObject_HEAD
char *pool_name; char *pool_name;
size_t pool_name_length;
uint32_t pool_size; uint32_t pool_size;
char *host; uint8_t reset_session;
char *user; uint32_t idle_timeout;
char *passwd; uint32_t acquire_timeout;
char *database; PyObject *configuration;
uint32_t port; MrdbConnection **connection;
MrdbConnection *connection; uint32_t connection_cnt;
} MrdbConnectionPool; uint16_t max_size;
} MrdbPool;
typedef struct { typedef struct {
enum enum_field_types type; enum enum_field_types type;
@ -218,6 +225,7 @@ PyObject *Mariadb_InterfaceError;
PyObject *Mariadb_Error; PyObject *Mariadb_Error;
PyObject *Mariadb_DatabaseError; PyObject *Mariadb_DatabaseError;
PyObject *Mariadb_DataError; PyObject *Mariadb_DataError;
PyObject *Mariadb_PoolError;
PyObject *Mariadb_OperationalError; PyObject *Mariadb_OperationalError;
PyObject *Mariadb_IntegrityError; PyObject *Mariadb_IntegrityError;
PyObject *Mariadb_InternalError; PyObject *Mariadb_InternalError;
@ -228,6 +236,7 @@ PyObject *Mariadb_Warning;
PyObject *Mrdb_Pickle; PyObject *Mrdb_Pickle;
/* Object types */ /* Object types */
PyTypeObject MrdbPool_Type;
PyTypeObject Mariadb_Fieldinfo_Type; PyTypeObject Mariadb_Fieldinfo_Type;
PyTypeObject MrdbIndicator_Type; PyTypeObject MrdbIndicator_Type;
PyTypeObject MrdbConnection_Type; PyTypeObject MrdbConnection_Type;
@ -263,6 +272,10 @@ PyObject *MrdbConnection_close(MrdbConnection *self);
PyObject *MrdbConnection_connect( PyObject *self,PyObject *args, PyObject *kwargs); PyObject *MrdbConnection_connect( PyObject *self,PyObject *args, PyObject *kwargs);
void MrdbConnection_SetAttributes(MrdbConnection *self); void MrdbConnection_SetAttributes(MrdbConnection *self);
/* Pooling */
PyObject *MrdbPool_add(PyObject *self, PyObject *args, PyObject *kwargs);
PyObject *MrdbPool_getconnection(MrdbPool *self);
/* TPC methods */ /* TPC methods */
PyObject *MrdbConnection_xid(MrdbConnection *self, PyObject *args); PyObject *MrdbConnection_xid(MrdbConnection *self, PyObject *args);
PyObject *MrdbConnection_tpc_begin(MrdbConnection *self, PyObject *args); PyObject *MrdbConnection_tpc_begin(MrdbConnection *self, PyObject *args);
@ -291,6 +304,7 @@ uint8_t MrdbParser_parse(MrdbParser *p, uint8_t is_batch, char *errmsg, size_t e
#define MARIADB_PY_PARAMSTYLE "qmark" #define MARIADB_PY_PARAMSTYLE "qmark"
#define MARIADB_PY_THREADSAFETY 1 #define MARIADB_PY_THREADSAFETY 1
#define MAX_POOL_SIZE 64
/* Helper macros */ /* Helper macros */
@ -301,8 +315,8 @@ uint8_t MrdbParser_parse(MrdbParser *p, uint8_t is_batch, char *errmsg, size_t e
(mysql_get_server_version((mysql)) >= (version)) (mysql_get_server_version((mysql)) >= (version))
#define MARIADB_CHECK_CONNECTION(connection, ret)\ #define MARIADB_CHECK_CONNECTION(connection, ret)\
if (!connection || !connection->mysql) {\ if (!(connection) || !(connection)->mysql) {\
mariadb_throw_exception(connection->mysql, Mariadb_Error, 0,\ mariadb_throw_exception((connection)->mysql, Mariadb_Error, 0,\
"Invalid connection or not connected");\ "Invalid connection or not connected");\
return (ret);\ return (ret);\
} }
@ -328,6 +342,16 @@ if (!cursor->stmt || !cursor->stmt->mysql || cursor->is_closed)\
"Invalid cursor or not connected");\ "Invalid cursor or not connected");\
} }
#define pooling_keywords "pool_name", "pool_size", "reset_session", "idle_timeout", "acquire_timeout"
#define connection_keywords "dsn", "host", "user", "password", "database", "port", "socket",\
"connect_timeout", "read_timeout", "write_timeout",\
"local_infile", "compress", "init_command",\
"default_file", "default_group",\
"ssl_key", "ssl_ca", "ssl_cert", "ssl_crl",\
"ssl_cipher", "ssl_capath", "ssl_crlpath",\
"ssl_verify_cert", "ssl",\
"client_flags", "charset"
/* MariaDB protocol macros */ /* MariaDB protocol macros */
#define int1store(T,A) *((int8_t*) (T)) = (A) #define int1store(T,A) *((int8_t*) (T)) = (A)
#define uint1korr(A) (*(((uint8_t*)(A)))) #define uint1korr(A) (*(((uint8_t*)(A))))

View File

@ -5,10 +5,11 @@ import sys
class MariaDBConfiguration(): class MariaDBConfiguration():
lib_dirs = "" lib_dirs = []
libs = "" libs = []
version = "" version = []
includes = "" includes = []
extra_objects = []
def mariadb_config(config, option): def mariadb_config(config, option):
@ -34,12 +35,18 @@ def dequote(s):
def get_config(): def get_config():
required_version = "3.1.0" required_version = "3.1.0"
no_env = 0 no_env = 0
static = 0
try: try:
config_prg = os.environ["MARIADB_CONFIG"] config_prg = os.environ["MARIADB_CONFIG"]
except KeyError: except KeyError:
config_prg = 'mariadb_config' config_prg = 'mariadb_config'
try:
static = os.environ["MARIADB_STATIC"]
except KeyError:
static = 0
cc_version = mariadb_config(config_prg, "cc_version") cc_version = mariadb_config(config_prg, "cc_version")
if cc_version[0] < required_version: if cc_version[0] < required_version:
print ('MariaDB Connector/Python requires MariaDB Connector/C >= %s, found version %s' % ( print ('MariaDB Connector/Python requires MariaDB Connector/C >= %s, found version %s' % (
@ -50,9 +57,13 @@ def get_config():
libs = mariadb_config(config_prg, "libs") libs = mariadb_config(config_prg, "libs")
cfg.lib_dirs = [dequote(i[2:]) for i in libs if i.startswith("-L")] cfg.lib_dirs = [dequote(i[2:]) for i in libs if i.startswith("-L")]
cfg.libs = [dequote(i[2:]) for i in libs if i.startswith("-l")] cfg.libs = [dequote(i[2:]) for i in libs if i.startswith("-l")]
includes = mariadb_config(config_prg, "include") includes = mariadb_config(config_prg, "include")
mariadb_includes = [dequote(i[2:]) for i in includes if i.startswith("-I")] mariadb_includes = [dequote(i[2:]) for i in includes if i.startswith("-I")]
mariadb_includes.extend(["./include"]) mariadb_includes.extend(["./include"])
if static:
cfg.extra_objects = ['{}/lib{}.a'.format(cfg.lib_dirs[0], l) for l in ["mariadbclient"]]
cfg.libs = []
cfg.includes = mariadb_includes cfg.includes = mariadb_includes
return cfg return cfg

View File

@ -15,6 +15,10 @@ class MariaDBConfiguration():
def get_config(): def get_config():
required_version = "3.1.0" required_version = "3.1.0"
try:
static= os.environ["MARIADB_STATIC"]
except KeyErrror:
try: try:
config_prg = os.environ["MARIADB_CC_INSTALL_DIR"] config_prg = os.environ["MARIADB_CC_INSTALL_DIR"]
cc_version = ["", ""] cc_version = ["", ""]
@ -49,5 +53,9 @@ def get_config():
cfg.version = cc_version[0] cfg.version = cc_version[0]
cfg.includes = [".\\include", cc_instdir[0] + "\\include", cc_instdir[0] + "\\include\\mysql"] cfg.includes = [".\\include", cc_instdir[0] + "\\include", cc_instdir[0] + "\\include\\mysql"]
cfg.lib_dirs = [cc_instdir[0] + "\\lib"] cfg.lib_dirs = [cc_instdir[0] + "\\lib"]
cfg.libs = ["mariadbclient", "ws2_32", "advapi32", "kernel32", "shlwapi", "crypt32"] cfg.libs = ["ws2_32", "advapi32", "kernel32", "shlwapi", "crypt32"]
if static:
cfg.libs.append("libmariadb")
else:
cfg.libs.append("mariadbclient")
return cfg return cfg

View File

@ -11,6 +11,8 @@ if os.name == "nt":
cfg = get_config() cfg = get_config()
print(cfg.extra_objects)
setup(name='mariadb', setup(name='mariadb',
version='0.9.1', version='0.9.1',
classifiers = [ classifiers = [

View File

@ -24,6 +24,8 @@
#include <datetime.h> #include <datetime.h>
PyObject *Mrdb_Pickle= NULL; PyObject *Mrdb_Pickle= NULL;
PyObject *cnx_pool= NULL;
extern uint16_t max_pool_size;
int Mariadb_traverse(PyObject *self, int Mariadb_traverse(PyObject *self,
visitproc visit, visitproc visit,
@ -56,6 +58,9 @@ static PyMethodDef Mariadb_Methods[] =
{"connect", (PyCFunction)MrdbConnection_connect, {"connect", (PyCFunction)MrdbConnection_connect,
METH_VARARGS | METH_KEYWORDS, METH_VARARGS | METH_KEYWORDS,
module_connect__doc__}, module_connect__doc__},
{"ConnectionPool", (PyCFunction)MrdbPool_add,
METH_VARARGS | METH_KEYWORDS,
"todo!!"},
/* PEP-249 DB-API */ /* PEP-249 DB-API */
{"DateFromTicks", (PyCFunction)Mariadb_date_from_ticks, {"DateFromTicks", (PyCFunction)Mariadb_date_from_ticks,
METH_VARARGS, METH_VARARGS,
@ -110,6 +115,7 @@ static char exception_warning_doc[]= "Exception raised for important warnings li
static char exception_database_doc[]= "Exception raised for errors that are related to the database"; static char exception_database_doc[]= "Exception raised for errors that are related to the database";
static char exception_data_doc[] = "Exception raised for errors that are due to problems with the processed data " static char exception_data_doc[] = "Exception raised for errors that are due to problems with the processed data "
"like division by zero, numeric value out of range, etc."; "like division by zero, numeric value out of range, etc.";
static char exception_pool_doc[] = "Exeception rasied for errors related to ConnectionPool class.";
static char exception_operational_doc[] = "Exception raised for errors that are related to the database's operation " static char exception_operational_doc[] = "Exception raised for errors that are related to the database's operation "
"and not necessarily under the control of the programmer."; "and not necessarily under the control of the programmer.";
static char exception_integrity_doc[]= "Exception raised when the relational integrity of the database is affected, " static char exception_integrity_doc[]= "Exception raised when the relational integrity of the database is affected, "
@ -153,6 +159,10 @@ PyMODINIT_FUNC PyInit_mariadb(void)
if (PyType_Ready(&MrdbCursor_Type) == -1) if (PyType_Ready(&MrdbCursor_Type) == -1)
goto error; goto error;
Py_TYPE(&MrdbPool_Type) = &PyType_Type;
if (PyType_Ready(&MrdbPool_Type) == -1)
goto error;
Py_TYPE(&MrdbIndicator_Type) = &PyType_Type; Py_TYPE(&MrdbIndicator_Type) = &PyType_Type;
if (PyType_Ready(&MrdbIndicator_Type) == -1) if (PyType_Ready(&MrdbIndicator_Type) == -1)
goto error; goto error;
@ -203,13 +213,16 @@ PyMODINIT_FUNC PyInit_mariadb(void)
"mariadb.DatabaseError", exception_database_doc, "DatabaseError"); "mariadb.DatabaseError", exception_database_doc, "DatabaseError");
mariadb_add_exception(module, &Mariadb_DataError, mariadb_add_exception(module, &Mariadb_DataError,
"mariadb.DatabaseError.DataError", exception_data_doc, "DataError"); "mariadb.DatabaseError.DataError", exception_data_doc, "DataError");
mariadb_add_exception(module, &Mariadb_PoolError,
"mariadb.PoolError", exception_pool_doc, "PoolError");
// PyModule_AddObject(module, "DatabaseError", Mariadb_DatabaseError);
Py_INCREF(&MrdbConnection_Type); Py_INCREF(&MrdbConnection_Type);
PyModule_AddObject(module, "connection", (PyObject *)&MrdbConnection_Type); PyModule_AddObject(module, "connection", (PyObject *)&MrdbConnection_Type);
cnx_pool= PyDict_New();
Py_INCREF(&MrdbPool_Type);
PyModule_AddObject(module, "ConnectionPool", (PyObject *)&MrdbPool_Type);
PyModule_AddObject(module, "_CONNECTION_POOLS", cnx_pool);
PyModule_AddObject(module, "indicator_null", MrdbIndicator_Object(STMT_INDICATOR_NULL)); PyModule_AddObject(module, "indicator_null", MrdbIndicator_Object(STMT_INDICATOR_NULL));
PyModule_AddObject(module, "indicator_default", MrdbIndicator_Object(STMT_INDICATOR_DEFAULT)); PyModule_AddObject(module, "indicator_default", MrdbIndicator_Object(STMT_INDICATOR_DEFAULT));

View File

@ -22,10 +22,15 @@
void MrdbConnection_dealloc(MrdbConnection *self); void MrdbConnection_dealloc(MrdbConnection *self);
extern PyObject *cnx_pool;
static PyObject *MrdbConnection_cursor(MrdbConnection *self, static PyObject *MrdbConnection_cursor(MrdbConnection *self,
PyObject *args, PyObject *args,
PyObject *kwargs); PyObject *kwargs);
/* todo: write more documentation, this is just a placeholder */
static PyObject *MrdbConnection_exception(PyObject *self, void *closure);
#define GETTER_EXCEPTION(name, exception, doc)\
{ name,MrdbConnection_exception, NULL, doc, &exception }
static PyObject *MrdbConnection_getid(MrdbConnection *self, void *closure); static PyObject *MrdbConnection_getid(MrdbConnection *self, void *closure);
static PyObject *MrdbConnection_getuser(MrdbConnection *self, void *closure); static PyObject *MrdbConnection_getuser(MrdbConnection *self, void *closure);
@ -62,6 +67,15 @@ static PyGetSetDef MrdbConnection_sets[]=
connection_server_version__doc__, NULL}, connection_server_version__doc__, NULL},
{"server_info", (getter)MrdbConnection_server_info, NULL, {"server_info", (getter)MrdbConnection_server_info, NULL,
connection_server_info__doc__, NULL}, connection_server_info__doc__, NULL},
GETTER_EXCEPTION("Error", Mariadb_Error, ""),
GETTER_EXCEPTION("Warning", Mariadb_Warning, ""),
GETTER_EXCEPTION("InterfaceError", Mariadb_InterfaceError, ""),
GETTER_EXCEPTION("ProgrammingError", Mariadb_ProgrammingError, ""),
GETTER_EXCEPTION("IntegrityError", Mariadb_IntegrityError, ""),
GETTER_EXCEPTION("DatabaseError", Mariadb_DatabaseError, ""),
GETTER_EXCEPTION("NotSupportedError", Mariadb_NotSupportedError, ""),
GETTER_EXCEPTION("InternalError", Mariadb_InternalError, ""),
GETTER_EXCEPTION("OperationalError", Mariadb_OperationalError, ""),
{NULL} {NULL}
}; };
@ -211,10 +225,10 @@ void MrdbConnection_SetAttributes(MrdbConnection *self)
self->collation= PyUnicode_FromString(cinfo.name); self->collation= PyUnicode_FromString(cinfo.name);
} }
static int static int
MrdbConnection_Initialize(MrdbConnection *self, MrdbConnection_Initialize(MrdbConnection *self,
PyObject *args, PyObject *args,
PyObject *dsnargs) PyObject *dsnargs)
{ {
int rc; int rc;
/* Todo: we need to support all dsn parameters, the current /* Todo: we need to support all dsn parameters, the current
@ -241,9 +255,10 @@ MrdbConnection_Initialize(MrdbConnection *self,
"ssl_key", "ssl_ca", "ssl_cert", "ssl_crl", "ssl_key", "ssl_ca", "ssl_cert", "ssl_crl",
"ssl_cipher", "ssl_capath", "ssl_crlpath", "ssl_cipher", "ssl_capath", "ssl_crlpath",
"ssl_verify_cert", "ssl", "ssl_verify_cert", "ssl",
"client_flags", "charset", "pool_name", "pool_size", NULL "client_flags", "charset", "pool_name", "pool_size", NULL
}; };
if (!PyArg_ParseTupleAndKeywords(args, dsnargs, if (!PyArg_ParseTupleAndKeywords(args, dsnargs,
"|sssssisiiipissssssssssipissi:connect", "|sssssisiiipissssssssssipissi:connect",
dsn_keys, dsn_keys,
@ -254,7 +269,7 @@ MrdbConnection_Initialize(MrdbConnection *self,
&ssl_key, &ssl_ca, &ssl_cert, &ssl_crl, &ssl_key, &ssl_ca, &ssl_cert, &ssl_crl,
&ssl_cipher, &ssl_capath, &ssl_crlpath, &ssl_cipher, &ssl_capath, &ssl_crlpath,
&ssl_verify_cert, &ssl_enforce, &ssl_verify_cert, &ssl_enforce,
&client_flags, &charset)) &client_flags, &charset, &pool_name, &pool_size))
return -1; return -1;
if (dsn) if (dsn)
@ -264,6 +279,16 @@ MrdbConnection_Initialize(MrdbConnection *self,
return -1; return -1;
} }
/* do we need pooling? */
if (pool_name)
{
/* check if pool exists */
if (PyDict_Contains(cnx_pool, PyUnicode_FromString(pool_name)))
{
/* get connection from pool */
}
}
if (!(self->mysql= mysql_init(NULL))) if (!(self->mysql= mysql_init(NULL)))
{ mariadb_throw_exception(self->mysql, Mariadb_OperationalError, 0, { mariadb_throw_exception(self->mysql, Mariadb_OperationalError, 0,
"Can't allocate memory for connection"); "Can't allocate memory for connection");
@ -401,13 +426,29 @@ PyTypeObject MrdbConnection_Type = {
0, /* (PyObject *) tp_defined */ 0, /* (PyObject *) tp_defined */
}; };
PyObject * PyObject *
MrdbConnection_connect( MrdbConnection_connect(
PyObject *self, PyObject *self,
PyObject *args, PyObject *args,
PyObject *kwargs) PyObject *kwargs)
{ {
MrdbConnection *c; MrdbConnection *c;
PyObject *pn= NULL,
*pool= NULL;
/* if pool name exists, we need to return a connection from pool */
if ((pn= PyDict_GetItemString(kwargs, "pool_name")))
{
if ((pool = PyDict_GetItem(cnx_pool, pn)))
{
return MrdbPool_getconnection((MrdbPool *)pool);
}
if ((pool = MrdbPool_add(self, args, kwargs)))
{
return MrdbPool_getconnection((MrdbPool *)pool);
}
}
if (!(c= (MrdbConnection *)PyType_GenericAlloc(&MrdbConnection_Type, 1))) if (!(c= (MrdbConnection *)PyType_GenericAlloc(&MrdbConnection_Type, 1)))
return NULL; return NULL;
@ -421,8 +462,7 @@ MrdbConnection_connect(
} }
/* destructor of MariaDB Connection object */ /* destructor of MariaDB Connection object */
void void MrdbConnection_dealloc(MrdbConnection *self)
MrdbConnection_dealloc(MrdbConnection *self)
{ {
if (self) if (self)
{ {
@ -441,10 +481,18 @@ PyObject *MrdbConnection_close(MrdbConnection *self)
MARIADB_CHECK_CONNECTION(self, NULL); MARIADB_CHECK_CONNECTION(self, NULL);
/* Todo: check if all the cursor stuff is deleted (when using prepared /* Todo: check if all the cursor stuff is deleted (when using prepared
statemnts this should be handled in mysql_close) */ statemnts this should be handled in mysql_close) */
if (self->pool)
{
if (!mysql_reset_connection(self->mysql))
self->inuse= 0;
return Py_None;
}
Py_BEGIN_ALLOW_THREADS Py_BEGIN_ALLOW_THREADS
mysql_close(self->mysql); mysql_close(self->mysql);
Py_END_ALLOW_THREADS Py_END_ALLOW_THREADS
self->mysql= NULL; self->mysql= NULL;
Py_INCREF(Py_None); Py_INCREF(Py_None);
return Py_None; return Py_None;
} }
@ -461,6 +509,15 @@ static PyObject *MrdbConnection_cursor(MrdbConnection *self,
return cursor; return cursor;
} }
static PyObject *
MrdbConnection_exception(PyObject *self, void *closure)
{
PyObject *exception = *(PyObject **)closure;
Py_INCREF(exception);
return exception;
}
/* {{{ MrdbConnection_commit */ /* {{{ MrdbConnection_commit */
PyObject *MrdbConnection_commit(MrdbConnection *self) PyObject *MrdbConnection_commit(MrdbConnection *self)
{ {

View File

@ -208,6 +208,14 @@ static int MrdbCursor_initialize(MrdbCursor *self, PyObject *args,
"O!|bkkbb", key_words, &MrdbConnection_Type, &connection, "O!|bkkbb", key_words, &MrdbConnection_Type, &connection,
&is_named_tuple, &prefetch_rows, &cursor_type, &is_buffered, &is_named_tuple, &prefetch_rows, &cursor_type, &is_buffered,
&is_prepared)) &is_prepared))
return -1;
if (!((MrdbConnection *)connection)->mysql)
{
mariadb_throw_exception(NULL, Mariadb_ProgrammingError, 0,
"Connection isn't valid anymore");
return -1;
}
if (cursor_type != CURSOR_TYPE_READ_ONLY && if (cursor_type != CURSOR_TYPE_READ_ONLY &&
cursor_type != CURSOR_TYPE_NO_CURSOR) cursor_type != CURSOR_TYPE_NO_CURSOR)

420
src/mariadb_pooling.c Normal file
View File

@ -0,0 +1,420 @@
/************************************************************************************
Copyright (C) 2018 Georg Richter and MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not see <http://www.gnu.org/licenses>
or write to the Free Software Foundation, Inc.,
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
*************************************************************************************/
#include <mariadb_python.h>
#include <docs/pool.h>
static void MrdbPool_dealloc(MrdbPool *self);
static PyObject *MrdbPool_addconnection(MrdbPool *self, PyObject *args);
static PyObject *MrdbPool_setconfig(MrdbPool *self, PyObject *args, PyObject *kwargs);
static PyObject *MrdbPool_poolsize(MrdbPool *self);
static PyObject *MrdbPool_poolname(MrdbPool *self);
static int MrdbPool_set_resetconnection(MrdbPool *self, PyObject *arg);
static PyObject * MrdbPool_get_resetconnection(MrdbPool *self);
extern PyObject *cnx_pool;
uint16_t max_pool_size= MAX_POOL_SIZE;
static PyGetSetDef MrdbPool_sets[]=
{
{"pool_name", (getter)MrdbPool_poolname, NULL,
pool_pool_name__doc__},
{"pool_size", (getter)MrdbPool_poolsize, NULL,
pool_pool_size__doc__},
{"pool_reset_connection", (getter)MrdbPool_get_resetconnection,
(setter)MrdbPool_set_resetconnection,
pool_pool_reset_connection__doc__},
{NULL}
};
static PyMethodDef MrdbPool_Methods[] =
{
{"add_connection", (PyCFunction)MrdbPool_addconnection,
METH_VARARGS,
pool_add_connection__doc__},
{"get_connection", (PyCFunction)MrdbPool_getconnection,
METH_NOARGS,
pool_get_connection__doc__},
{"set_config", (PyCFunction)MrdbPool_setconfig,
METH_VARARGS | METH_KEYWORDS,
pool_set_config__doc__ },
{NULL} /* always last */
};
static struct PyMemberDef MrdbPool_Members[] =
{
{"max_size",
T_SHORT,
offsetof(MrdbPool, max_size),
READONLY,
pool_max_size__doc__},
{NULL}
};
/* {{{ MrdbPool_initialize
Pool initialization
Keywords:
pool_name name of the pool
pool_size number of max. connections
reset_session reset session after returning to the pool
idle_timeout
acquire_timeout
*/
static int MrdbPool_initialize(MrdbPool *self, PyObject *args,
PyObject *kwargs)
{
char *key_words[]= {"pool_name", "pool_size", "pool_reset_connection", NULL};
PyObject *pool_kwargs= NULL;
PyObject *conn_kwargs= NULL;
char *pool_name= NULL;
size_t pool_name_length= 0;
uint32_t pool_size= 5;
uint8_t reset_session= 1;
uint32_t idle_timeout= 1800;
uint32_t acquire_timeout= 10000;
uint32_t i;
PyObject *pn;
PyObject *key, *value;
Py_ssize_t pos = 0;
if (!self)
return -1;
/* check if pool already exists */
if ((pn= PyDict_GetItemString(kwargs, "pool_name")))
{
if (PyDict_Contains(cnx_pool, pn))
{
mariadb_throw_exception(NULL, Mariadb_ProgrammingError, 0,
"Pool '%s' already exists", PyUnicode_AsUTF8(pn));
return -1;
}
}
while(PyDict_Next(kwargs, &pos, &key, &value))
{
const char *utf8key= PyUnicode_AsUTF8(key);
if (!strncmp(utf8key, "pool", 4))
{
if (!pool_kwargs)
pool_kwargs= PyDict_New();
PyDict_SetItemString(pool_kwargs, utf8key, value);
} else {
if (!conn_kwargs)
conn_kwargs= PyDict_New();
PyDict_SetItemString(conn_kwargs, utf8key, value);
}
}
if (!PyArg_ParseTupleAndKeywords(args, pool_kwargs,
"|s#ib:ConnectionPool", key_words, &pool_name, &pool_name_length,
&pool_size, &reset_session))
return -1;
if (pool_size > max_pool_size)
{
mariadb_throw_exception(NULL, Mariadb_ProgrammingError, 0,
"pool_size exceeds maximum of %d", MAX_POOL_SIZE);
goto error;
}
self->pool_name= strdup(pool_name);
self->pool_name_length= pool_name_length;
self->pool_size= pool_size;
self->max_size= max_pool_size;
self->reset_session= reset_session;
self->idle_timeout= idle_timeout;
if (!(self->connection= (MrdbConnection **)PyMem_RawCalloc(self->pool_size, sizeof(PyObject *))))
{
mariadb_throw_exception(NULL, PyExc_MemoryError, 0, "can't allocate %ld bytes",
(unsigned long)self->pool_size * sizeof(PyObject*));
goto error;
}
if ((self->configuration= conn_kwargs))
{
for (i=0; i < pool_size; i++)
{
if (!(self->connection[i]=
(MrdbConnection *)MrdbConnection_connect(NULL, args, self->configuration)))
goto error;
Py_INCREF(self->connection[i]);
self->connection[i]->pool= self;
}
self->connection_cnt= self->pool_size;
}
PyDict_SetItemString(cnx_pool, self->pool_name, (PyObject *)self);
Py_DECREF(self);
return 0;
error:
if (self->connection)
{
for (i=0; i < self->pool_size; i++)
{
if (self->connection[i])
{
self->connection[i]->pool= 0;
MrdbConnection_close(self->connection[i]);
}
}
}
MARIADB_FREE_MEM(self->connection);
return -1;
}
/* }}} */
static int MrdbPool_traverse(
MrdbPool *self,
visitproc visit,
void *arg)
{
return 0;
}
PyTypeObject MrdbPool_Type =
{
PyVarObject_HEAD_INIT(NULL, 0)
"mariadb.ConnectionPool",
sizeof(MrdbPool),
0,
(destructor)MrdbPool_dealloc, /* tp_dealloc */
0, /*tp_print*/
0, /* tp_getattr */
0, /* tp_setattr */
0, /* PyAsyncMethods * */
0, /* tp_repr */
/* Method suites for standard classes */
0, /* (PyNumberMethods *) tp_as_number */
0, /* (PySequenceMethods *) tp_as_sequence */
0, /* (PyMappingMethods *) tp_as_mapping */
/* More standard operations (here for binary compatibility) */
0, /* (hashfunc) tp_hash */
0, /* (ternaryfunc) tp_call */
0, /* (reprfunc) tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
/* Functions to access object as input/output buffer */
0, /* (PyBufferProcs *) tp_as_buffer */
/* (tp_flags) Flags to define presence of optional/expanded features */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_HAVE_FINALIZE,
connection_pool__doc__, /* tp_doc Documentation string */
/* call function for all accessible objects */
(traverseproc)MrdbPool_traverse,/* tp_traverse */
/* delete references to contained objects */
0, /* tp_clear */
/* rich comparisons */
0, /* (richcmpfunc) tp_richcompare */
/* weak reference enabler */
0, /* (long) tp_weaklistoffset */
/* Iterators */
0, //(getiterfunc)MrdbCursor_iter,
0, //(iternextfunc)MrdbCursor_iternext,
/* Attribute descriptor and subclassing stuff */
(struct PyMethodDef *)MrdbPool_Methods, /* tp_methods */
(struct PyMemberDef *)MrdbPool_Members, /* tp_members */
MrdbPool_sets,
0, /* (struct _typeobject *) tp_base; */
0, /* (PyObject *) tp_dict */
0, /* (descrgetfunc) tp_descr_get */
0, /* (descrsetfunc) tp_descr_set */
0, /* (long) tp_dictoffset */
(initproc)MrdbPool_initialize, /* tp_init */
PyType_GenericAlloc, //NULL, /* tp_alloc */
PyType_GenericNew, //NULL, /* tp_new */
0, /* tp_free Low-level free-memory routine */
};
/*{{{ MrDBPool_dealloc */
void MrdbPool_dealloc(MrdbPool *self)
{
uint32_t i;
if (self->pool_name)
{
if (PyDict_Contains(cnx_pool, PyUnicode_FromString(self->pool_name)))
{
PyDict_DelItemString(cnx_pool, self->pool_name);
}
MARIADB_FREE_MEM(self->pool_name);
self->pool_name= NULL;
}
for (i=0; i < self->pool_size; i++)
{
if (self->connection[i])
{
self->connection[i]->pool= NULL;
MrdbConnection_close(self->connection[i]);
self->connection[i]= NULL;
}
}
self->pool_size= 0;
MARIADB_FREE_MEM(self->connection);
self->connection= NULL;
Py_TYPE(self)->tp_free((PyObject*)self);
}
/* }}} */
PyObject *
MrdbPool_add(
PyObject *self,
PyObject *args,
PyObject *kwargs)
{
MrdbPool *c;
if (!(c= (MrdbPool *)PyType_GenericAlloc(&MrdbPool_Type, 1)))
return NULL;
if (MrdbPool_initialize(c, args, kwargs))
{
Py_DECREF(c);
return NULL;
}
return (PyObject *) c;
}
PyObject *MrdbPool_getconnection(MrdbPool *self)
{
uint32_t i;
for (i=0; i < self->pool_size; i++)
{
if (!self->connection[i]->inuse)
{
if (self->connection[i] && !mysql_ping(self->connection[i]->mysql))
{
self->connection[i]->inuse= 1;
return (PyObject *)self->connection[i];
} else {
self->connection[i]->pool= NULL;
MrdbConnection_close(self->connection[i]);
self->connection[i]= NULL;
}
}
}
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"No more connections from pool '%s' available",
self->pool_name);
return NULL;
}
static PyObject *MrdbPool_setconfig(MrdbPool *self, PyObject *args, PyObject *kwargs)
{
/* PyObject *conf= NULL;
if (!PyArg_ParseTuple(args, "O!", &PyDict_Type, &conf))
return NULL;
*/
self->configuration= kwargs;
Py_RETURN_NONE;
}
static PyObject * MrdbPool_addconnection(MrdbPool *self, PyObject *args)
{
uint32_t i;
MrdbConnection *conn= NULL;
if (!self->configuration)
{
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"Couldn't get configuration for pool '%s'.",
self->pool_name);
return NULL;
}
if (!PyArg_ParseTuple(args, "|O!", &MrdbConnection_Type, &conn))
return NULL;
if (conn && conn->pool)
{
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"Connection is already in connection pool '%s'.",
self->pool_name);
return NULL;
}
for (i=0; i < self->pool_size; i++)
{
if (!self->connection[i])
{
if (!conn &&
(!(conn = (MrdbConnection *)MrdbConnection_connect(NULL, args, self->configuration))))
return NULL;
self->connection[i]= conn;
self->connection[i]->inuse= 0;
conn->pool= self;
Py_RETURN_NONE;
}
}
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"Couldn't add connection to pool '%s' (no free slot available).",
self->pool_name);
return NULL;
}
static PyObject *MrdbPool_poolname(MrdbPool *self)
{
return PyUnicode_FromString(self->pool_name);
}
static PyObject *MrdbPool_poolsize(MrdbPool *self)
{
return PyLong_FromUnsignedLongLong(self->pool_size);
}
static PyObject *MrdbPool_get_resetconnection(MrdbPool *self)
{
if (self->reset_session)
Py_RETURN_TRUE;
Py_RETURN_FALSE;
}
/* {{{ MrdbCursor_setbuffered */
static int MrdbPool_set_resetconnection(MrdbPool *self, PyObject *arg)
{
if (!arg || Py_TYPE(arg) != &PyBool_Type)
{
PyErr_SetString(PyExc_TypeError, "Argument must be boolean");
return -1;
}
self->reset_session= PyObject_IsTrue(arg);
return 0;
}

View File

@ -0,0 +1,134 @@
#!/usr/bin/env python -O
# -*- coding: utf-8 -*-
import collections
import datetime
import unittest
import mariadb
from test.base_test import create_connection, conf
class TestPooling(unittest.TestCase):
def setUp(self):
self.connection = create_connection()
self.connection.autocommit = False
def tearDown(self):
del self.connection
def test_connection_pools(self):
pool= mariadb.ConnectionPool(pool_name="test")
self.assertEqual(mariadb._CONNECTION_POOLS["test"], pool)
del pool
self.assertEqual(mariadb._CONNECTION_POOLS, {})
def test_connection_pool_conf(self):
pool= mariadb.ConnectionPool(pool_name="test")
default_conf= conf()
conn= create_connection()
try:
pool.add_connection(conn)
except mariadb.PoolError:
pass
pool.set_config(**default_conf)
pool.add_connection(conn)
c= pool.get_connection()
self.assertEqual(c, conn)
del pool
def test_connection_pool_maxconn(self):
default_conf= conf()
pool= mariadb.ConnectionPool(pool_name="test1", **default_conf)
connections= []
for i in range(1, 6):
connections.append(pool.get_connection())
try:
x= pool.get_connection()
except mariadb.PoolError:
pass
for c in connections:
c.close()
x= pool.get_connection()
del pool
def test_connection_pool_add(self):
default_conf= conf()
pool= mariadb.ConnectionPool(pool_name="test1")
pool.set_config(**default_conf)
for i in range(1,6):
pool.add_connection()
try:
pool.add_connection()
except mariadb.PoolError:
pass
del pool
def test__CONNECTION_POOLS(self):
default_conf= conf()
pool= mariadb.ConnectionPool(pool_name="test", **default_conf)
conn= mariadb.connect(pool_name="test")
cursor= conn.cursor()
cursor.execute("SELECT 1")
row= cursor.fetchone()
self.assertEqual(row[0], 1)
del cursor
del pool
def test_create_pool_from_conn(self):
default_conf= conf()
key= "t1"
conn= mariadb.connect(pool_name=key, **default_conf)
cursor=conn.cursor()
del mariadb._CONNECTION_POOLS["t1"]
self.assertEqual(mariadb._CONNECTION_POOLS, {})
try:
cursor.execute("SELECT 1")
except mariadb.ProgrammingError:
pass
def test_pool_getter(self):
default_conf= conf()
conn= mariadb.connect(pool_name="getter_test", pool_size=4, **default_conf)
p= mariadb._CONNECTION_POOLS["getter_test"];
self.assertEqual(p.pool_name, "getter_test")
self.assertEqual(p.pool_size, 4)
self.assertEqual(p.pool_reset_connection, True)
self.assertEqual(p.max_size, 64)
del mariadb._CONNECTION_POOLS["getter_test"]
def test_pool_connection_reset(self):
default_conf= conf()
conn= mariadb.connect(pool_name="reset_test", pool_size=1, **default_conf)
cursor= conn.cursor()
cursor.execute("SELECT 1")
conn.close()
conn= mariadb.connect(pool_name="reset_test")
cursor= conn.cursor()
cursor.execute("SELECT 2")
row= cursor.fetchone()
self.assertEqual(row[0], 2)
del mariadb._CONNECTION_POOLS["reset_test"]
def test_pool_add(self):
default_conf= conf()
pool= mariadb.ConnectionPool(pool_name="test")
pool1= mariadb.ConnectionPool(pool_name="test1")
pool.set_config(**default_conf)
pool1.set_config(**default_conf)
conn= create_connection()
pool.add_connection(conn)
try:
pool.add_connection(conn)
except mariadb.PoolError as e:
pass
try:
pool1.add_connection(conn)
except mariadb.PoolError as e:
pass
del pool, pool1
if __name__ == '__main__':
unittest.main()