Exchanged C written connection pool class by native python class.

This commit is contained in:
Georg Richter
2021-01-18 06:11:03 +01:00
parent 2ce8f2cf2f
commit aa65cc853a
9 changed files with 313 additions and 714 deletions

View File

@ -1,89 +0,0 @@
/************************************************************************************
Copyright (C) 2019 Georg Richter and MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not see <http://www.gnu.org/licenses>
or write to the Free Software Foundation, Inc.,
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
*************************************************************************************/
PyDoc_STRVAR(
pool_pool_name__doc__,
"(read only)\n\n"
"Returns the name of the connection pool"
);
PyDoc_STRVAR(
connection_pool__doc__,
"Class defining a pool of database connections"
);
PyDoc_STRVAR(
pool_get_connection__doc__,
"get_connection()\n"
"--\n"
"\n"
"Returns a connection from the connection pool or raises a PoolError if\n"
"a connection is not available"
);
PyDoc_STRVAR(
pool_close__doc__,
"close()\n"
"--\n"
"\n"
"Closes connection pool and all connections."
);
PyDoc_STRVAR(
pool_add_connection__doc__,
"add_connection(connection)\n"
"--\n"
"\n"
"Parameter:\n"
"connection: mariadb connection object\n\n"
"Adds a connection to the connection pool. In case the pool doesn't\n"
"have a free slot or is not configured a PoolError exception will be raised."
);
PyDoc_STRVAR(
pool_set_config__doc__,
"set_config(configuration)\n"
"--\n"
"\n"
"Parameter:\n"
"configuration: dictionary\n\n"
"Sets the connection configuration for the connection pool.\n"
"For valid connection arguments check the mariadb.connect() method.\n\n"
"Note: This method doesn't create connections in the pool.\n"
"To fill the pool one has to use add_connection() ḿethod."
);
PyDoc_STRVAR(
pool_pool_size__doc__,
"(read only)\n\n"
"Returns the size of the connection pool"
);
PyDoc_STRVAR(
pool_max_size__doc__,
"(read only)\n\n"
"Returns the maximum allowed size of the connection pool"
);
PyDoc_STRVAR(
pool_pool_reset_connection__doc__,
"(read/write)\n\n"
"If set to true, the connection will be reset on both client and server side\n"
"after .close() method was called"
);

View File

@ -151,8 +151,6 @@ typedef struct st_parser {
MYSQL *mysql;
} MrdbParser;
struct mrdb_pool;
/* PEP-249: Connection object */
typedef struct {
PyObject_HEAD
@ -170,7 +168,6 @@ typedef struct {
int port;
const char *charset;
const char *collation;
struct mrdb_pool *pool;
uint8_t inuse;
uint8_t status;
struct timespec last_used;
@ -183,6 +180,7 @@ typedef struct {
PyObject *server_version_info;
} MrdbConnection;
/*
typedef struct mrdb_pool{
PyObject_HEAD
pthread_mutex_t lock;
@ -196,7 +194,7 @@ typedef struct mrdb_pool{
MrdbConnection **connection;
uint32_t connection_cnt;
uint16_t max_size;
} MrdbPool;
} MrdbPool; */
typedef struct {
enum enum_field_types type;
@ -347,13 +345,6 @@ MrdbConnection_connect( PyObject *self,PyObject *args, PyObject *kwargs);
void
MrdbConnection_SetAttributes(MrdbConnection *self);
/* Pooling */
PyObject *
MrdbPool_add(PyObject *self, PyObject *args, PyObject *kwargs);
PyObject *
MrdbPool_getconnection(MrdbPool *self);
/* TPC methods */
PyObject *
MrdbConnection_xid(MrdbConnection *self, PyObject *args);
@ -470,7 +461,7 @@ if ((obj)->thread_state)\
"Invalid cursor or not connected");\
}
#define pooling_keywords "pool_name", "pool_size", "reset_session", "idle_timeout", "acquire_timeout"
// #define pooling_keywords "pool_name", "pool_size", "reset_session", "idle_timeout", "acquire_timeout"
#define connection_keywords "dsn", "host", "user", "password", "database", "port", "socket",\
"connect_timeout", "read_timeout", "write_timeout",\
"local_infile", "compress", "init_command",\

View File

@ -9,7 +9,6 @@ Minimum supported Python version is 3.6
from ._mariadb import (
Binary,
ConnectionPool,
DataError,
DatabaseError,
Error,
@ -21,20 +20,28 @@ from ._mariadb import (
PoolError,
ProgrammingError,
Warning,
_CONNECTION_POOLS,
__version__,
__version_info__,
connect,
mariadbapi_version,
)
from .field import fieldinfo
_POOLS= _CONNECTION_POOLS= {}
from mariadb.dbapi20 import *
from mariadb.connectionpool import *
from mariadb.pooling import *
'''
test attribute
'''
test=1
def connect(*args, **kwargs):
from mariadb.connections import Connection
if kwargs and "pool_name" in kwargs:
if not kwargs["pool_name"] in mariadb._CONNECTION_POOLS:
pool= mariadb.ConnectionPool(**kwargs)
else:
pool= mariadb._CONNECTION_POOLS[kwargs["pool_name"]]
c= pool.get_connection()
return c
return Connection(*args, **kwargs)
Connection= connect

222
mariadb/connectionpool.py Normal file
View File

@ -0,0 +1,222 @@
import mariadb, sys
import _thread, time
MAX_POOL_SIZE = 64
POOL_IDLE_TIMEOUT = 1800
class ConnectionPool(object):
"""
Class defining a pool of database connections
MariaDB Connector/Python supports simple connection pooling.
A connection pool holds a number of open connections and handles
thread safety when providing connections to threads.
The size of a connection pool is configurable at creation time,
but cannot be changed afterwards. The maximum size of a connection
pool is limited to 64 connections.
"""
def __init__(self, *args, **kwargs):
"""
Creates a connetion pool class
:param str pool_name:
Name of connection pool
:param int pool_size:
Size of pool. If not specified default value of 5 will be used.
Maximum allowed number is 64.
:param bool pool_reset_connection:
Will reset the connection before returning it to the pool.
Default value is True.
"""
self._connections = []
self._pool_args = {}
self._conn_args = {}
self._lock_pool = _thread.RLock()
key_words= ["pool_name", "pool_size", "pool_reset_connection"]
# check if pool_name was provided
if kwargs and "pool_name" in kwargs:
# check if pool_name already exists
if kwargs["pool_name"] in mariadb._CONNECTION_POOLS:
raise mariadb.ProgrammingError("Pool '%s' already exists" \
% kwargs["pool_name"])
else:
raise mariadb.ProgrammingError("No pool name specified")
# save pool keyword arguments
self._pool_args["name"]= kwargs.get("pool_name")
self._pool_args["size"]= kwargs.get("pool_size", 5);
self._pool_args["reset_connection"]= \
kwargs.get("pool_reset_connection", True)
# validate pool size (must be in range between 1 and MAX_POOL_SIZE)
if not (0 < self._pool_args["size"] <= MAX_POOL_SIZE):
raise mariadb.ProgrammError("Pool size must be in range of "\
"1 and %s" % MAX_POOL_SIZE)
# store pool and connection arguments
self._conn_args= kwargs.copy()
for key in key_words:
if key in self._conn_args:
del self._conn_args[key]
if len(self._conn_args) > 0:
with self._lock_pool:
# fill connection pool
for i in range(0, self._pool_args["size"]):
try:
connection= mariadb.Connection(**self._conn_args)
except:
# if an error occured, close all connections and raise exception
for j in range(0, len(self._connections)):
try:
self._connections[j].close()
except:
# connect failed, so we are not interested in errors
# from close() method
pass
del self._connections[j]
raise
self.add_connection(connection)
# store connection pool in _CONNECTION_POOLS
mariadb._CONNECTION_POOLS[self._pool_args["name"]]= self
def add_connection(self, connection= None):
"""
Adds a connection object to the connection pool.
In case that the pool doesnt have a free slot or is not configured
a PoolError exception will be raised.
"""
if not self._conn_args:
raise mariadb.PoolError("Couldn't get configuration for pool %s" % \
self._pool_args["name"])
if connection is not None and \
not isinstance(connection, mariadb.connections.Connection):
raise TypeError("Passed parameter is not a connection object")
if connection == None and len(self._conn_args) == 0:
raise mariadb.PoolError("Can't get configuration for pool %s" % \
self._pool_args["name"])
if len(self._connections) >= self._pool_args["size"]:
raise mariadb.PoolError("Can't add connection to pool %s: "
"No free slot available (%s)." % (self._pool_args["name"], len(self._connections)))
with self._lock_pool:
if connection is None:
connection= mariadb.Connection(**self._conn_args)
connection._Connection__pool= self
connection._Connection__in_use= 0
connection._Connection__last_used= time.monotonic()
self._connections.append(connection)
def get_connection(self):
"""
Returns a connection from the connection pool or raises a PoolError
exception if a connection is not available.
"""
now= time.monotonic()
conn= None
timediff= 0
with self._lock_pool:
for i in range(0, len(self._connections)):
if not self._connections[i]._Connection__in_use:
try:
self._connections[i].ping()
except:
continue
t = now - self._connections[i]._Connection__last_used
if t > timediff:
conn= self._connections[i]
timediff = t
if conn:
conn._Connection__in_use= 1
return conn
def _close_connection(self, connection):
"""
Returns connection to the pool. Internally used
by connection object.
"""
with self._lock_pool:
if self.pool_args["reset_connection"]:
connection.reset()
connection._Connection__in_use= 0
connection._Connection__last_used= time.monotonic()
def set_config(self, **kwargs):
"""
:param dict configuration
configuration settings for pooled connection
Sets the connection configuration for the connection pool.
For valid connection arguments check the mariadb.connect() method.
Note: This method doesn't create connections in the pool.
To fill the pool one has to use add_connection() ḿethod.
"""
self._conn_args= kwargs;
def close(self):
"""Closes connection pool and all connections."""
try:
for c in self._connections:
c._Connection__pool= None
c.close()
finally:
self._connections= None
del mariadb._CONNECTION_POOLS[self._pool_args["name"]]
@property
def pool_name(self):
"""Returns the name of the connection pool."""
return self._pool_args["name"]
@property
def pool_size(self):
"""Returns the size of the connection pool."""
return self._pool_args["size"]
@property
def max_size(self):
"Returns the maximum size for connection pools."""
return MAX_POOL_SIZE
@property
def connection_count(self):
"Returns the number of connections in connection pool."""
return self._connections
@property
def pool_reset_connection(self):
"""
If set to true, the connection will be reset on both client and server side\n
after .close() method was called
"""
return self._pool_args["reset_connection"]
@pool_reset_connection.setter
def pool_reset_connection(self, reset):
self._pool_args["reset_connection"]= reset

68
mariadb/connections.py Normal file
View File

@ -0,0 +1,68 @@
#
# Copyright (C) 2020-2021 Georg Richter and MariaDB Corporation AB
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Library General Public License for more details.
# You should have received a copy of the GNU Library General Public
# License along with this library; if not see <http://www.gnu.org/licenses>
# or write to the Free Software Foundation, Inc.,
# 51 Franklin St., Fifth Floor, Boston, MA 02110, USA
#
import mariadb
import socket
import time
_DEFAULT_CHARSET = "utf8mb4"
_DEFAULT_COLLATION = "utf8mb4_general_ci"
class Connection(mariadb._mariadb.connection):
"""MariaDB connection class"""
def __init__(self, *args, **kwargs):
self._socket= None
self.__in_use= 0
self.__pool = None
self.__last_used = 0
super().__init__(*args, **kwargs)
def cursor(self, *args, **kwargs):
return mariadb._mariadb.connection.cursor(self, *args, **kwargs)
def close(self):
if self._Connection__pool:
self._Connection__pool._close_connection(self)
else:
super().close()
@property
def character_set(self):
"""Client character set."""
return _DEFAULT_CHARSET
@property
def collation(self):
"""Client character set collation"""
return _DEFAULT_COLLATION
@property
def socket(self):
"""Returns the socket used for database connection"""
fno= self.get_socket()
if not self._socket:
self._socket= socket.socket(fileno=fno)
# in case of a possible reconnect, file descriptor has changed
elif fno != self._socket.fileno():
self._socket= socket.socket(fileno=fno)
return self._socket

View File

@ -26,9 +26,9 @@
extern int codecs_datetime_init(void);
PyObject *cnx_pool= NULL;
PyObject *decimal_module= NULL,
*decimal_type= NULL,
*socket_module= NULL,
*indicator_module= NULL;
extern uint16_t max_pool_size;
@ -53,9 +53,6 @@ Mariadb_Methods[] =
{"connect", (PyCFunction)MrdbConnection_connect,
METH_VARARGS | METH_KEYWORDS,
module_connect__doc__},
{"ConnectionPool", (PyCFunction)MrdbPool_add,
METH_VARARGS | METH_KEYWORDS,
"todo!!"},
/* Todo: add methods for api functions which don't require
a connection */
{NULL} /* always last */
@ -129,14 +126,13 @@ PyMODINIT_FUNC PyInit__mariadb(void)
goto error;
}
Py_TYPE(&MrdbCursor_Type) = &PyType_Type;
if (PyType_Ready(&MrdbCursor_Type) == -1)
if (!(socket_module= PyImport_ImportModule("socket")))
{
goto error;
}
Py_TYPE(&MrdbPool_Type) = &PyType_Type;
if (PyType_Ready(&MrdbPool_Type) == -1)
Py_TYPE(&MrdbCursor_Type) = &PyType_Type;
if (PyType_Ready(&MrdbCursor_Type) == -1)
{
goto error;
}
@ -210,11 +206,6 @@ PyMODINIT_FUNC PyInit__mariadb(void)
Py_INCREF(&MrdbConnection_Type);
PyModule_AddObject(module, "connection", (PyObject *)&MrdbConnection_Type);
cnx_pool= PyDict_New();
Py_INCREF(&MrdbPool_Type);
PyModule_AddObject(module, "ConnectionPool", (PyObject *)&MrdbPool_Type);
PyModule_AddObject(module, "_CONNECTION_POOLS", cnx_pool);
return module;
error:
PyErr_SetString(PyExc_ImportError, "Mariadb module initialization failed");

View File

@ -41,8 +41,6 @@ const char *mariadb_default_collation= "utf8mb4_general_ci";
void
MrdbConnection_dealloc(MrdbConnection *self);
extern PyObject *cnx_pool;
static PyObject
*MrdbConnection_cursor(MrdbConnection *self, PyObject *args, PyObject *kwargs);
@ -353,16 +351,6 @@ MrdbConnection_Initialize(MrdbConnection *self,
return -1;
}
/* do we need pooling? */
if (pool_name)
{
/* check if pool exists */
if (PyDict_Contains(cnx_pool, PyUnicode_FromString(pool_name)))
{
/* get connection from pool */
}
}
if (!(self->mysql= mysql_init(NULL)))
{
mariadb_throw_exception(self->mysql, Mariadb_OperationalError, 1,
@ -573,22 +561,6 @@ MrdbConnection_connect(
PyObject *kwargs)
{
MrdbConnection *c;
PyObject *pn= NULL,
*pool= NULL;
/* if pool name exists, we need to return a connection from pool */
if (kwargs && (pn= PyDict_GetItemString(kwargs, "pool_name")))
{
if ((pool = PyDict_GetItem(cnx_pool, pn)))
{
return MrdbPool_getconnection((MrdbPool *)pool);
}
if ((pool = MrdbPool_add(self, args, kwargs)))
{
return MrdbPool_getconnection((MrdbPool *)pool);
}
}
if (!(c= (MrdbConnection *)PyType_GenericAlloc(&MrdbConnection_Type, 1)))
return NULL;
@ -626,24 +598,6 @@ PyObject *MrdbConnection_close(MrdbConnection *self)
Py_XDECREF(self->converter);
self->converter= NULL;
if (self->pool)
{
int rc= 0;
pthread_mutex_lock(&self->pool->lock);
if (self->pool->reset_session)
{
rc= mysql_reset_connection(self->mysql);
}
if (!rc)
{
self->inuse= 0;
clock_gettime(CLOCK_MONOTONIC_RAW, &self->last_used);
}
pthread_mutex_unlock(&self->pool->lock);
Py_INCREF(Py_None);
return Py_None;
}
Py_BEGIN_ALLOW_THREADS
mysql_close(self->mysql);
Py_END_ALLOW_THREADS

View File

@ -1,544 +0,0 @@
/******************************************************************************
Copyright (C) 2018 Georg Richter and MariaDB Corporation AB
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Library General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Library General Public License for more details.
You should have received a copy of the GNU Library General Public
License along with this library; if not see <http://www.gnu.org/licenses>
or write to the Free Software Foundation, Inc.,
51 Franklin St., Fifth Floor, Boston, MA 02110, USA
*****************************************************************************/
#include <mariadb_python.h>
#include <docs/pool.h>
static void
MrdbPool_dealloc(MrdbPool *self);
static PyObject
*MrdbPool_addconnection(MrdbPool *self, PyObject *args);
static PyObject *
MrdbPool_setconfig(MrdbPool *self, PyObject *args, PyObject *kwargs);
static PyObject *
MrdbPool_poolsize(MrdbPool *self);
static PyObject *
MrdbPool_poolname(MrdbPool *self);
static PyObject *
MrdbPool_close(MrdbPool *self);
static int
MrdbPool_set_resetconnection(MrdbPool *self, PyObject *arg);
static PyObject *
MrdbPool_get_resetconnection(MrdbPool *self);
extern PyObject *cnx_pool;
uint16_t max_pool_size= MAX_POOL_SIZE;
static PyGetSetDef
MrdbPool_sets[]=
{
{"pool_name", (getter)MrdbPool_poolname, NULL,
pool_pool_name__doc__},
{"pool_size", (getter)MrdbPool_poolsize, NULL,
pool_pool_size__doc__},
{"pool_reset_connection", (getter)MrdbPool_get_resetconnection,
(setter)MrdbPool_set_resetconnection,
pool_pool_reset_connection__doc__},
{NULL}
};
static PyMethodDef
MrdbPool_Methods[] =
{
{"add_connection", (PyCFunction)MrdbPool_addconnection,
METH_VARARGS,
pool_add_connection__doc__},
{"get_connection", (PyCFunction)MrdbPool_getconnection,
METH_NOARGS,
pool_get_connection__doc__},
{"set_config", (PyCFunction)MrdbPool_setconfig,
METH_VARARGS | METH_KEYWORDS,
pool_set_config__doc__ },
{"close", (PyCFunction)MrdbPool_close,
METH_NOARGS,
pool_close__doc__},
{NULL} /* always last */
};
static struct PyMemberDef
MrdbPool_Members[] =
{
{"max_size",
T_SHORT,
offsetof(MrdbPool, max_size),
READONLY,
pool_max_size__doc__},
{NULL}
};
/* Pool initialization
Keywords:
pool_name name of the pool
pool_size number of max. connections
reset_session reset session after returning to the pool
idle_timeout
acquire_timeout
*/
static int
MrdbPool_initialize(MrdbPool *self, PyObject *args, PyObject *kwargs)
{
char *key_words[]= {"pool_name", "pool_size", "pool_reset_connection", NULL};
PyObject *pool_kwargs= NULL;
PyObject *conn_kwargs= NULL;
char *pool_name= NULL;
Py_ssize_t pool_name_length= 0;
uint32_t pool_size= 5;
uint8_t reset_session= 1;
uint32_t idle_timeout= 1800;
uint32_t i;
PyObject *pn;
PyObject *key, *value;
Py_ssize_t pos = 0;
if (!self)
{
return -1;
}
/* check if pool already exists */
if (kwargs && (pn= PyDict_GetItemString(kwargs, "pool_name")))
{
if (PyDict_Contains(cnx_pool, pn))
{
mariadb_throw_exception(NULL, Mariadb_ProgrammingError, 0,
"Pool '%s' already exists", PyUnicode_AsUTF8(pn));
return -1;
}
}
else {
mariadb_throw_exception(NULL, Mariadb_ProgrammingError, 0,
"No pool name specified");
return -1;
}
while(PyDict_Next(kwargs, &pos, &key, &value))
{
const char *utf8key= PyUnicode_AsUTF8(key);
if (!strncmp(utf8key, "pool", 4))
{
if (!pool_kwargs)
{
pool_kwargs= PyDict_New();
}
PyDict_SetItemString(pool_kwargs, utf8key, value);
}
else {
if (!conn_kwargs)
{
conn_kwargs= PyDict_New();
}
PyDict_SetItemString(conn_kwargs, utf8key, value);
}
}
if (!PyArg_ParseTupleAndKeywords(args, pool_kwargs,
"|s#ib:ConnectionPool", key_words, &pool_name,
&pool_name_length, &pool_size, &reset_session))
{
return -1;
}
if (pool_size > max_pool_size)
{
mariadb_throw_exception(NULL, Mariadb_ProgrammingError, 0,
"pool_size exceeds maximum of %d", MAX_POOL_SIZE);
goto error;
}
pthread_mutex_init(&self->lock, NULL);
self->pool_name= strdup(pool_name);
self->pool_name_length= pool_name_length;
self->pool_size= pool_size;
self->max_size= max_pool_size;
self->reset_session= reset_session;
self->idle_timeout= idle_timeout;
if (!(self->connection= (MrdbConnection **)PyMem_RawCalloc(self->pool_size, sizeof(PyObject *))))
{
mariadb_throw_exception(NULL, PyExc_MemoryError, 0, "can't allocate %ld bytes",
(unsigned long)self->pool_size * sizeof(PyObject*));
goto error;
}
if ((self->configuration= conn_kwargs))
{
for (i=0; i < pool_size; i++)
{
if (!(self->connection[i]=
(MrdbConnection *)MrdbConnection_connect(NULL, args, self->configuration)))
{
goto error;
}
clock_gettime(CLOCK_MONOTONIC_RAW, &self->connection[i]->last_used);
Py_INCREF(self->connection[i]);
self->connection[i]->pool= self;
}
self->connection_cnt= self->pool_size;
}
PyDict_SetItemString(cnx_pool, self->pool_name, (PyObject *)self);
Py_INCREF(self);
return 0;
error:
if (self->connection)
{
for (i=0; i < self->pool_size; i++)
{
if (self->connection[i])
{
self->connection[i]->pool= 0;
MrdbConnection_close(self->connection[i]);
self->connection[i]= NULL;
}
}
}
self->pool_size= 0;
MARIADB_FREE_MEM(self->connection);
return -1;
}
static int
MrdbPool_traverse(MrdbPool *self,
visitproc visit,
void *arg)
{
return 0;
}
PyTypeObject
MrdbPool_Type =
{
PyVarObject_HEAD_INIT(NULL, 0)
"mariadb.ConnectionPool",
sizeof(MrdbPool),
0,
(destructor)MrdbPool_dealloc, /* tp_dealloc */
0, /*tp_print*/
0, /* tp_getattr */
0, /* tp_setattr */
0, /* PyAsyncMethods * */
0, /* tp_repr */
/* Method suites for standard classes */
0, /* (PyNumberMethods *) tp_as_number */
0, /* (PySequenceMethods *) tp_as_sequence */
0, /* (PyMappingMethods *) tp_as_mapping */
/* More standard operations (here for binary compatibility) */
0, /* (hashfunc) tp_hash */
0, /* (ternaryfunc) tp_call */
0, /* (reprfunc) tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
/* Functions to access object as input/output buffer */
0, /* (PyBufferProcs *) tp_as_buffer */
/* (tp_flags) Flags to define presence of optional/expanded features */
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_HAVE_FINALIZE,
connection_pool__doc__, /* tp_doc Documentation string */
/* call function for all accessible objects */
(traverseproc)MrdbPool_traverse,/* tp_traverse */
/* delete references to contained objects */
0, /* tp_clear */
/* rich comparisons */
0, /* (richcmpfunc) tp_richcompare */
/* weak reference enabler */
0, /* (long) tp_weaklistoffset */
/* Iterators */
0, //(getiterfunc)MrdbCursor_iter,
0, //(iternextfunc)MrdbCursor_iternext,
/* Attribute descriptor and subclassing stuff */
(struct PyMethodDef *)MrdbPool_Methods, /* tp_methods */
(struct PyMemberDef *)MrdbPool_Members, /* tp_members */
MrdbPool_sets,
0, /* (struct _typeobject *) tp_base; */
0, /* (PyObject *) tp_dict */
0, /* (descrgetfunc) tp_descr_get */
0, /* (descrsetfunc) tp_descr_set */
0, /* (long) tp_dictoffset */
(initproc)MrdbPool_initialize, /* tp_init */
PyType_GenericAlloc, //NULL, /* tp_alloc */
PyType_GenericNew, //NULL, /* tp_new */
0, /* tp_free Low-level free-memory routine */
};
void
MrdbPool_dealloc(MrdbPool *self)
{
PyObject_GC_UnTrack(self);
MrdbPool_close(self);
Py_TYPE(self)->tp_free((PyObject*)self);
}
/* }}} */
PyObject *
MrdbPool_add(PyObject *self,
PyObject *args,
PyObject *kwargs)
{
MrdbPool *c;
if (!(c= (MrdbPool *)PyType_GenericAlloc(&MrdbPool_Type, 1)))
{
return NULL;
}
if (MrdbPool_initialize(c, args, kwargs))
{
Py_DECREF(c);
return NULL;
}
return (PyObject *) c;
}
PyObject *
MrdbPool_getconnection(MrdbPool *self)
{
uint32_t i;
MrdbConnection *conn= NULL;
uint64_t tdiff= 0;
struct timespec now;
clock_gettime(CLOCK_MONOTONIC_RAW, &now);
pthread_mutex_lock(&self->lock);
for (i=0; i < self->pool_size; i++)
{
if (self->connection[i] && !self->connection[i]->inuse)
{
if (!mysql_ping(self->connection[i]->mysql))
{
uint64_t t= TIMEDIFF(now, self->connection[i]->last_used);
if (t >= tdiff)
{
conn= self->connection[i];
tdiff= t;
}
}
else {
self->connection[i]->pool= NULL;
MrdbConnection_close(self->connection[i]);
self->connection[i]= NULL;
}
}
}
if (conn)
{
conn->inuse= 1;
}
pthread_mutex_unlock(&self->lock);
if (conn)
{
return (PyObject *)conn;
}
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"No more connections from pool '%s' available",
self->pool_name);
return NULL;
}
static PyObject
*MrdbPool_setconfig(MrdbPool *self, PyObject *args, PyObject *kwargs)
{
PyObject *key, *value;
Py_ssize_t pos = 0;
if (!kwargs || Py_TYPE(kwargs) != &PyDict_Type)
{
PyErr_SetString(PyExc_TypeError, "Argument must be dictionary");
return NULL;
}
while(PyDict_Next(kwargs, &pos, &key, &value))
{
const char *utf8key= PyUnicode_AsUTF8(key);
uint8_t i=0, found=0;
while (dsn_keys[i])
{
if (!strcmp(utf8key, dsn_keys[i]))
{
found= 1;
break;
}
i++;
}
if (!found)
{
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"Invalid DSN parameter '%s'",
utf8key);
return NULL;
}
}
self->configuration= kwargs;
Py_INCREF(self->configuration);
Py_RETURN_NONE;
}
static PyObject *
MrdbPool_addconnection(MrdbPool *self, PyObject *args)
{
uint32_t i;
MrdbConnection *conn= NULL;
if (!self->configuration)
{
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"Couldn't get configuration for pool '%s'.",
self->pool_name);
return NULL;
}
if (!PyArg_ParseTuple(args, "|O!", &MrdbConnection_Type, &conn))
{
return NULL;
}
if (conn && conn->pool)
{
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"Connection is already in connection pool '%s'.",
self->pool_name);
return NULL;
}
pthread_mutex_lock(&self->lock);
for (i=0; i < self->pool_size; i++)
{
if (!self->connection[i])
{
if (!conn &&
!(conn = (MrdbConnection *)MrdbConnection_connect(NULL, args,
self->configuration)))
{
pthread_mutex_unlock(&self->lock);
return NULL;
}
Py_INCREF(conn);
self->connection[i]= conn;
self->connection[i]->inuse= 0;
clock_gettime(CLOCK_MONOTONIC_RAW, &self->connection[i]->last_used);
conn->pool= self;
pthread_mutex_unlock(&self->lock);
Py_RETURN_NONE;
}
}
pthread_mutex_unlock(&self->lock);
mariadb_throw_exception(NULL, Mariadb_PoolError, 0,
"Couldn't add connection to pool '%s' (no free slot available).",
self->pool_name);
return NULL;
}
static PyObject *
MrdbPool_poolname(MrdbPool *self)
{
return PyUnicode_FromString(self->pool_name);
}
static PyObject
*MrdbPool_poolsize(MrdbPool *self)
{
return PyLong_FromUnsignedLongLong(self->pool_size);
}
static PyObject *
MrdbPool_get_resetconnection(MrdbPool *self)
{
if (self->reset_session)
{
Py_RETURN_TRUE;
}
Py_RETURN_FALSE;
}
static int
MrdbPool_set_resetconnection(MrdbPool *self, PyObject *arg)
{
if (!arg || Py_TYPE(arg) != &PyBool_Type)
{
PyErr_SetString(PyExc_TypeError, "Argument must be boolean");
return -1;
}
self->reset_session= PyObject_IsTrue(arg);
return 0;
}
static PyObject *
MrdbPool_close(MrdbPool *self)
{
uint32_t i;
pthread_mutex_lock(&self->lock);
if (self->connection)
{
for (i=0; i < self->pool_size; i++)
{
if (self->connection[i])
{
self->connection[i]->pool= NULL;
MrdbConnection_close(self->connection[i]);
self->connection[i]= NULL;
}
}
MARIADB_FREE_MEM(self->connection);
self->connection= NULL;
}
self->pool_size= 0;
if (self->pool_name)
{
if (PyDict_Contains(cnx_pool, PyUnicode_FromString(self->pool_name)))
{
PyDict_DelItemString(cnx_pool, self->pool_name);
}
MARIADB_FREE_MEM(self->pool_name);
self->pool_name= 0;
}
pthread_mutex_unlock(&self->lock);
pthread_mutex_destroy(&self->lock);
Py_INCREF(Py_None);
return Py_None;
}

View File

@ -79,8 +79,7 @@ setup(name='mariadb',
ext_modules=[Extension('mariadb._mariadb', ['mariadb/mariadb.c', 'mariadb/mariadb_connection.c',
'mariadb/mariadb_exception.c', 'mariadb/mariadb_cursor.c',
'mariadb/mariadb_codecs.c',
'mariadb/mariadb_parser.c',
'mariadb/mariadb_pooling.c'],
'mariadb/mariadb_parser.c'],
define_macros= define_macros,
include_dirs=cfg.includes,
library_dirs=cfg.lib_dirs,
@ -89,6 +88,6 @@ setup(name='mariadb',
extra_link_args = cfg.extra_link_args,
extra_objects= cfg.extra_objects
)],
py_modules=['mariadb.__init__', 'mariadb.constants.CLIENT', 'mariadb.constants.CURSOR', 'mariadb.field', 'mariadb.dbapi20',
py_modules=['mariadb.__init__', 'mariadb.constants.CLIENT', 'mariadb.constants.CURSOR', 'mariadb.field', 'mariadb.dbapi20', 'mariadb.connections', 'mariadb.connectionpool',
'mariadb.constants.FIELD_TYPE', 'mariadb.constants.FIELD_FLAG', 'mariadb.constants.INDICATOR'],
)