mirror of
https://github.com/mariadb-corporation/mariadb-connector-python.git
synced 2025-08-07 11:39:43 +00:00

Replaced collections.named_tuple by C-Pythons PyStruct_Sequence. All conversion are done now before fetching a row: converting to other result types than tuple (named tuple or dict) now have less overhead and are significantly faster.
841 lines
31 KiB
Python
841 lines
31 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
''' Python DB API 2.0 driver compliance unit test suite.
|
|
|
|
This software is Public Domain and may be used without restrictions.
|
|
|
|
"Now we have booze and barflies entering the discussion, plus rumours of
|
|
DBAs on drugs... and I won't tell you what flashes through my mind each
|
|
time I read the subject line with 'Anal Compliance' in it. All around
|
|
this is turning out to be a thoroughly unwholesome unit test."
|
|
|
|
-- Ian Bicking
|
|
'''
|
|
|
|
__rcs_id__ = '$Id$'
|
|
__version__ = '$Revision$'[11:-2]
|
|
__author__ = 'Stuart Bishop <zen@shangri-la.dropbear.id.au>'
|
|
|
|
import time
|
|
import unittest
|
|
|
|
import mariadb
|
|
import datetime
|
|
|
|
# $Log$
|
|
# Revision 1.1.2.1 2006/02/25 03:44:32 adustman
|
|
# Generic DB-API unit test module
|
|
#
|
|
# Revision 1.10 2003/10/09 03:14:14 zenzen
|
|
# Add test for DB API 2.0 optional extension, where database exceptions
|
|
# are exposed as attributes on the Connection object.
|
|
#
|
|
# Revision 1.9 2003/08/13 01:16:36 zenzen
|
|
# Minor tweak from Stefan Fleiter
|
|
#
|
|
# Revision 1.8 2003/04/10 00:13:25 zenzen
|
|
# Changes, as per suggestions by M.-A. Lemburg
|
|
# - Add a table prefix, to ensure namespace collisions can always be avoided
|
|
#
|
|
# Revision 1.7 2003/02/26 23:33:37 zenzen
|
|
# Break out DDL into helper functions, as per request by David Rushby
|
|
#
|
|
# Revision 1.6 2003/02/21 03:04:33 zenzen
|
|
# Stuff from Henrik Ekelund:
|
|
# added test_None
|
|
# added test_nextset & hooks
|
|
#
|
|
# Revision 1.5 2003/02/17 22:08:43 zenzen
|
|
# Implement suggestions and code from Henrik Eklund - test that
|
|
# cursor.arraysize
|
|
# defaults to 1 & generic cursor.callproc test added
|
|
#
|
|
# Revision 1.4 2003/02/15 00:16:33 zenzen
|
|
# Changes, as per suggestions and bug reports by M.-A. Lemburg,
|
|
# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar
|
|
# - Class renamed
|
|
# - Now a subclass of TestCase, to avoid requiring the driver stub
|
|
# to use multiple inheritance
|
|
# - Reversed the polarity of buggy test in test_description
|
|
# - Test exception hierarchy correctly
|
|
# - self.populate is now self._populate(), so if a driver stub
|
|
# overrides self.ddl1 this change propagates
|
|
# - VARCHAR columns now have a width, which will hopefully make the
|
|
# DDL even more portable (this will be reversed if it causes more problems)
|
|
# - cursor.rowcount being checked after various execute and fetchXXX methods
|
|
# - Check for fetchall and fetchmany returning empty lists after results
|
|
# are exhausted (already checking for empty lists if select retrieved
|
|
# nothing
|
|
# - Fix bugs in test_setoutputsize_basic and test_setinputsizes
|
|
#
|
|
from test.conf_test import conf
|
|
|
|
from test.base_test import is_maxscale
|
|
|
|
|
|
class DatabaseAPI20Test(unittest.TestCase):
|
|
''' Test a database self.driver for DB API 2.0 compatibility.
|
|
This implementation tests Gadfly, but the TestCase
|
|
is structured so that other self.drivers can subclass this
|
|
test case to ensure compliance with the DB-API. It is
|
|
expected that this TestCase may be expanded in the future
|
|
if ambiguities or edge conditions are discovered.
|
|
|
|
The 'Optional Extensions' are not yet being tested.
|
|
|
|
self.drivers should subclass this test, overriding setUp, tearDown,
|
|
self.driver, connect_args and connect_kw_args. Class specification
|
|
should be as follows:
|
|
|
|
import dbapi20
|
|
class mytest(dbapi20.DatabaseAPI20Test):
|
|
[...]
|
|
|
|
Don't 'import DatabaseAPI20Test from dbapi20', or you will
|
|
confuse the unit tester - just 'import dbapi20'.
|
|
'''
|
|
|
|
# The self.driver module. This should be the module where the 'connect'
|
|
# method is to be found
|
|
driver = mariadb
|
|
connect_args = () # List of arguments to pass to connect
|
|
connect_kw_args = conf() # Keyword arguments for connect
|
|
table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
|
|
|
|
ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
|
|
ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
|
|
xddl1 = 'drop table %sbooze' % table_prefix
|
|
xddl2 = 'drop table %sbarflys' % table_prefix
|
|
|
|
# Name of stored procedure to convert string->lowercase
|
|
lowerfunc = 'lower'
|
|
|
|
# Some drivers may need to override these helpers, for example adding
|
|
# a 'commit' after the execute.
|
|
def executeDDL1(self, cursor):
|
|
cursor.execute(self.ddl1)
|
|
|
|
def executeDDL2(self, cursor):
|
|
cursor.execute(self.ddl2)
|
|
|
|
def setUp(self):
|
|
''' self.drivers should override this method to perform required setup
|
|
if any is necessary, such as creating the database.
|
|
'''
|
|
pass
|
|
|
|
def tearDown(self):
|
|
''' self.drivers should override this method to perform required
|
|
cleanup if any is necessary, such as deleting the test database.
|
|
The default drops the tables that may be created.
|
|
'''
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
for ddl in (self.xddl1, self.xddl2):
|
|
try:
|
|
cur.execute(ddl)
|
|
con.commit()
|
|
except self.driver.Error:
|
|
# Assume table didn't exist. Other tests will check if
|
|
# execute is busted.
|
|
pass
|
|
finally:
|
|
cur.close()
|
|
con.close()
|
|
|
|
def _connect(self):
|
|
try:
|
|
return self.driver.connect(**self.connect_kw_args)
|
|
except AttributeError:
|
|
self.fail("No connect method found in self.driver module")
|
|
|
|
def test_connect(self):
|
|
con = self._connect()
|
|
con.close()
|
|
|
|
def test_apilevel(self):
|
|
try:
|
|
# Must exist
|
|
apilevel = self.driver.apilevel
|
|
# Must equal 2.0
|
|
self.assertEqual(apilevel, '2.0')
|
|
except AttributeError:
|
|
self.fail("Driver doesn't define apilevel")
|
|
|
|
def test_threadsafety(self):
|
|
try:
|
|
# Must exist
|
|
threadsafety = self.driver.threadsafety
|
|
# Must be a valid value
|
|
self.assertTrue(threadsafety in (0, 1, 2, 3))
|
|
except AttributeError:
|
|
self.fail("Driver doesn't define threadsafety")
|
|
|
|
def test_paramstyle(self):
|
|
try:
|
|
# Must exist
|
|
paramstyle = self.driver.paramstyle
|
|
# Must be a valid value
|
|
self.assertTrue(paramstyle in (
|
|
'qmark', 'numeric', 'named', 'format', 'pyformat'
|
|
))
|
|
except AttributeError:
|
|
self.fail("Driver doesn't define paramstyle")
|
|
|
|
def test_Exceptions(self):
|
|
# Make sure required exceptions exist, and are in the
|
|
# defined hierarchy.
|
|
self.assertTrue(issubclass(self.driver.Warning, Exception))
|
|
self.assertTrue(issubclass(self.driver.Error, Exception))
|
|
self.assertTrue(
|
|
issubclass(self.driver.InterfaceError, self.driver.Error)
|
|
)
|
|
self.assertTrue(
|
|
issubclass(self.driver.DatabaseError, self.driver.Error)
|
|
)
|
|
self.assertTrue(
|
|
issubclass(self.driver.OperationalError, self.driver.Error)
|
|
)
|
|
self.assertTrue(
|
|
issubclass(self.driver.IntegrityError, self.driver.Error)
|
|
)
|
|
self.assertTrue(
|
|
issubclass(self.driver.InternalError, self.driver.Error)
|
|
)
|
|
self.assertTrue(
|
|
issubclass(self.driver.ProgrammingError, self.driver.Error)
|
|
)
|
|
self.assertTrue(
|
|
issubclass(self.driver.NotSupportedError, self.driver.Error)
|
|
)
|
|
|
|
def test_ExceptionsAsConnectionAttributes(self):
|
|
# OPTIONAL EXTENSION
|
|
# Test for the optional DB API 2.0 extension, where the exceptions
|
|
# are exposed as attributes on the Connection object
|
|
# I figure this optional extension will be implemented by any
|
|
# driver author who is using this test suite, so it is enabled
|
|
# by default.
|
|
con = self._connect()
|
|
drv = self.driver
|
|
self.assertTrue(con.Warning is drv.Warning)
|
|
self.assertTrue(con.Error is drv.Error)
|
|
self.assertTrue(con.InterfaceError is drv.InterfaceError)
|
|
self.assertTrue(con.DatabaseError is drv.DatabaseError)
|
|
self.assertTrue(con.OperationalError is drv.OperationalError)
|
|
self.assertTrue(con.IntegrityError is drv.IntegrityError)
|
|
self.assertTrue(con.InternalError is drv.InternalError)
|
|
self.assertTrue(con.ProgrammingError is drv.ProgrammingError)
|
|
self.assertTrue(con.NotSupportedError is drv.NotSupportedError)
|
|
|
|
def test_commit(self):
|
|
con = self._connect()
|
|
try:
|
|
# Commit must work, even if it doesn't do anything
|
|
con.commit()
|
|
finally:
|
|
con.close()
|
|
|
|
def test_rollback(self):
|
|
con = self._connect()
|
|
# If rollback is defined, it should either work or throw
|
|
# the documented exception
|
|
if hasattr(con, 'rollback'):
|
|
try:
|
|
con.rollback()
|
|
except self.driver.NotSupportedError:
|
|
pass
|
|
|
|
def test_cursor(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
finally:
|
|
cur.close()
|
|
con.close()
|
|
|
|
def test_cursor_isolation(self):
|
|
con = self._connect()
|
|
try:
|
|
# Make sure cursors created from the same connection have
|
|
# the documented transaction isolation level
|
|
cur1 = con.cursor()
|
|
cur2 = con.cursor()
|
|
self.executeDDL1(cur1)
|
|
cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (
|
|
self.table_prefix
|
|
))
|
|
cur2.execute("select name from %sbooze" % self.table_prefix)
|
|
booze = cur2.fetchall()
|
|
self.assertEqual(len(booze), 1)
|
|
self.assertEqual(len(booze[0]), 1)
|
|
self.assertEqual(booze[0][0], 'Victoria Bitter')
|
|
finally:
|
|
cur1.close()
|
|
cur2.close()
|
|
con.close()
|
|
|
|
def test_description(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
self.executeDDL1(cur)
|
|
self.assertEqual(cur.description, None,
|
|
'cursor.description should be none after '
|
|
'executing a statement that can return no '
|
|
'rows (such as DDL)'
|
|
)
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
self.assertEqual(len(cur.description), 1,
|
|
'cursor.description describes too many columns'
|
|
)
|
|
self.assertEqual(len(cur.description[0]), 11,
|
|
'cursor.description[x] tuples must have '
|
|
'11 elements'
|
|
)
|
|
self.assertEqual(cur.description[0][0].lower(), 'name',
|
|
'cursor.description[x][0] must return column '
|
|
'name'
|
|
)
|
|
|
|
# table name
|
|
self.assertEqual(cur.description[0][8].lower(),
|
|
'dbapi20test_booze',
|
|
'cursor.description[x][0] must return column '
|
|
'name'
|
|
)
|
|
# original column name
|
|
self.assertEqual(cur.description[0][9].lower(), 'name',
|
|
'cursor.description[x][0] must return column '
|
|
'name'
|
|
)
|
|
# original table name
|
|
self.assertEqual(cur.description[0][10].lower(),
|
|
'dbapi20test_booze',
|
|
'cursor.description[x][0] must return i'
|
|
'column name'
|
|
)
|
|
self.assertEqual(cur.description[0][1], self.driver.STRING,
|
|
'cursor.description[x][1] must return column '
|
|
'type. Got %r' % cur.description[0][1]
|
|
)
|
|
|
|
# Make sure self.description gets reset
|
|
self.executeDDL2(cur)
|
|
self.assertEqual(cur.description, None,
|
|
'cursor.description not being set to None when '
|
|
'executing no-result statements (eg. DDL)'
|
|
)
|
|
finally:
|
|
cur.close()
|
|
con.close()
|
|
|
|
def test_rowcount(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor(buffered=True)
|
|
self.executeDDL1(cur)
|
|
self.assertEqual(cur.rowcount, 0,
|
|
'cursor.rowcount should be 0 after executing '
|
|
'no-result statements'
|
|
)
|
|
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
|
|
self.table_prefix
|
|
))
|
|
self.assertEqual(cur.rowcount, 1)
|
|
cur.execute("select name from %sbooze" % self.table_prefix)
|
|
self.assertEqual(cur.rowcount, 1)
|
|
self.executeDDL2(cur)
|
|
self.assertEqual(cur.rowcount, 0,
|
|
'cursor.rowcount not being reset to 0 after '
|
|
'executing no-result statements'
|
|
)
|
|
finally:
|
|
cur.close()
|
|
con.close()
|
|
|
|
lower_func = 'lower'
|
|
|
|
def test_close(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
finally:
|
|
con.close()
|
|
|
|
# cursor.execute should raise an Error if called after connection
|
|
# closed
|
|
self.assertRaises(self.driver.Error, self.executeDDL1, cur)
|
|
|
|
# connection.commit should raise an Error if called after connection'
|
|
# closed.'
|
|
self.assertRaises(self.driver.Error, con.commit)
|
|
|
|
# connection.close should raise an Error if called more than once
|
|
self.assertRaises(self.driver.Error, con.close)
|
|
|
|
def test_execute(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
self._paraminsert(cur)
|
|
finally:
|
|
con.close()
|
|
|
|
def _paraminsert(self, cur):
|
|
self.executeDDL1(cur)
|
|
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
|
|
self.table_prefix
|
|
))
|
|
self.assertTrue(cur.rowcount in (-1, 1))
|
|
|
|
if self.driver.paramstyle == 'qmark':
|
|
cur.execute(
|
|
'insert into %sbooze values (?)' % self.table_prefix,
|
|
("Cooper's",)
|
|
)
|
|
elif self.driver.paramstyle == 'numeric':
|
|
cur.execute(
|
|
'insert into %sbooze values (:1)' % self.table_prefix,
|
|
("Cooper's",)
|
|
)
|
|
elif self.driver.paramstyle == 'named':
|
|
cur.execute(
|
|
'insert into %sbooze values (:beer)' % self.table_prefix,
|
|
{'beer': "Cooper's"}
|
|
)
|
|
elif self.driver.paramstyle == 'format':
|
|
cur.execute(
|
|
'insert into %sbooze values (%%s)' % self.table_prefix,
|
|
("Cooper's",)
|
|
)
|
|
elif self.driver.paramstyle == 'pyformat':
|
|
cur.execute(
|
|
'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
|
|
{'beer': "Cooper's"}
|
|
)
|
|
else:
|
|
self.fail('Invalid paramstyle')
|
|
self.assertTrue(cur.rowcount in (-1, 1))
|
|
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
res = cur.fetchall()
|
|
self.assertEqual(len(res), 2, 'cursor.fetchall returned too few rows')
|
|
beers = [res[0][0], res[1][0]]
|
|
beers.sort()
|
|
self.assertEqual(beers[0], "Cooper's",
|
|
'cursor.fetchall retrieved incorrect data, or data '
|
|
'inserted incorrectly'
|
|
)
|
|
self.assertEqual(beers[1], "Victoria Bitter",
|
|
'cursor.fetchall retrieved incorrect data, '
|
|
'or data inserted incorrectly'
|
|
)
|
|
|
|
def test_executemany(self):
|
|
if is_maxscale():
|
|
self.skipTest("MAXSCALE doesn't support BULK yet")
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
self.executeDDL1(cur)
|
|
largs = [("Cooper's",), ("Boag's",)]
|
|
margs = [{'beer': "Cooper's"}, {'beer': "Boag's"}]
|
|
if self.driver.paramstyle == 'qmark':
|
|
cur.executemany(
|
|
'insert into %sbooze values (?)' % self.table_prefix,
|
|
largs
|
|
)
|
|
elif self.driver.paramstyle == 'numeric':
|
|
cur.executemany(
|
|
'insert into %sbooze values (:1)' % self.table_prefix,
|
|
largs
|
|
)
|
|
elif self.driver.paramstyle == 'named':
|
|
cur.executemany(
|
|
'insert into %sbooze values (:beer)' % self.table_prefix,
|
|
margs
|
|
)
|
|
elif self.driver.paramstyle == 'format':
|
|
cur.executemany(
|
|
'insert into %sbooze values (%%s)' % self.table_prefix,
|
|
largs
|
|
)
|
|
elif self.driver.paramstyle == 'pyformat':
|
|
cur.executemany(
|
|
'insert into %sbooze values (%%(beer)s)' % (
|
|
self.table_prefix
|
|
),
|
|
margs
|
|
)
|
|
else:
|
|
self.fail('Unknown paramstyle')
|
|
self.assertTrue(cur.rowcount in (-1, 2),
|
|
'insert using cursor.executemany set '
|
|
'cursor.rowcount to '
|
|
'incorrect value %r' % cur.rowcount
|
|
)
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
res = cur.fetchall()
|
|
self.assertEqual(len(res), 2,
|
|
'cursor.fetchall retrieved incorrect number '
|
|
'of rows'
|
|
)
|
|
beers = [res[0][0], res[1][0]]
|
|
beers.sort()
|
|
self.assertEqual(beers[0], "Boag's", 'incorrect data retrieved')
|
|
self.assertEqual(beers[1], "Cooper's", 'incorrect data retrieved')
|
|
finally:
|
|
con.close()
|
|
|
|
def test_fetchone(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor(buffered=True)
|
|
|
|
# cursor.fetchone should raise an Error if called before
|
|
# executing a select-type query
|
|
self.assertRaises(self.driver.Error, cur.fetchone)
|
|
|
|
# cursor.fetchone should raise an Error if called after
|
|
# executing a query that cannot return rows
|
|
self.executeDDL1(cur)
|
|
self.assertRaises(self.driver.Error, cur.fetchone)
|
|
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
self.assertEqual(cur.fetchone(), None,
|
|
'cursor.fetchone should return None if a query '
|
|
'retrieves no rows'
|
|
)
|
|
self.assertTrue(cur.rowcount in (-1, 0))
|
|
|
|
# cursor.fetchone should raise an Error if called after
|
|
# executing a query that cannot return rows
|
|
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
|
|
self.table_prefix
|
|
))
|
|
self.assertRaises(self.driver.Error, cur.fetchone)
|
|
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
r = cur.fetchone()
|
|
self.assertEqual(len(r), 1,
|
|
'cursor.fetchone should have retrieved a single '
|
|
' row'
|
|
)
|
|
self.assertEqual(r[0], 'Victoria Bitter',
|
|
'cursor.fetchone retrieved incorrect data'
|
|
)
|
|
self.assertEqual(cur.fetchone(), None,
|
|
'cursor.fetchone should return None '
|
|
'if no more rows available'
|
|
)
|
|
self.assertTrue(cur.rowcount in (-1, 1))
|
|
finally:
|
|
con.close()
|
|
|
|
samples = [
|
|
'Carlton Cold',
|
|
'Carlton Draft',
|
|
'Mountain Goat',
|
|
'Redback',
|
|
'Victoria Bitter',
|
|
'XXXX'
|
|
]
|
|
|
|
def _populate(self):
|
|
''' Return a list of sql commands to setup the DB for the fetch
|
|
tests.
|
|
'''
|
|
populate = [
|
|
"insert into %sbooze values ('%s')" % (self.table_prefix, s)
|
|
for s in self.samples
|
|
]
|
|
return populate
|
|
|
|
def test_fetchmany(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
|
|
# cursor.fetchmany should raise an Error if called without
|
|
# issuing a query
|
|
self.assertRaises(self.driver.Error, cur.fetchmany, 4)
|
|
|
|
self.executeDDL1(cur)
|
|
for sql in self._populate():
|
|
cur.execute(sql)
|
|
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
r = cur.fetchmany()
|
|
self.assertEqual(len(r), 1,
|
|
'cursor.fetchmany retrieved incorrect number of '
|
|
'rows, default of arraysize is one.'
|
|
)
|
|
cur.arraysize = 10
|
|
r = cur.fetchmany(3) # Should get 3 rows
|
|
self.assertEqual(len(r), 3,
|
|
'cursor.fetchmany retrieved incorrect number '
|
|
'of rows'
|
|
)
|
|
r = cur.fetchmany(4) # Should get 2 more
|
|
self.assertEqual(len(r), 2,
|
|
'cursor.fetchmany retrieved incorrect number '
|
|
'of rows'
|
|
)
|
|
r = cur.fetchmany(4) # Should be an empty sequence
|
|
self.assertEqual(len(r), 0,
|
|
'cursor.fetchmany should return an empty '
|
|
'sequence after '
|
|
'results are exhausted'
|
|
)
|
|
self.assertTrue(cur.rowcount in (-1, 6))
|
|
|
|
# Same as above, using cursor.arraysize
|
|
cur.arraysize = 4
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
r = cur.fetchmany() # Should get 4 rows
|
|
self.assertEqual(len(r), 4,
|
|
'cursor.arraysize not being honoured by fetchmany'
|
|
)
|
|
r = cur.fetchmany() # Should get 2 more
|
|
self.assertEqual(len(r), 2)
|
|
r = cur.fetchmany() # Should be an empty sequence
|
|
self.assertEqual(len(r), 0)
|
|
self.assertTrue(cur.rowcount in (-1, 6))
|
|
|
|
cur.arraysize = 6
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
rows = cur.fetchmany() # Should get all rows
|
|
self.assertTrue(cur.rowcount in (-1, 6))
|
|
self.assertEqual(len(rows), 6)
|
|
self.assertEqual(len(rows), 6)
|
|
rows = [r[0] for r in rows]
|
|
rows.sort()
|
|
|
|
# Make sure we get the right data back out
|
|
for i in range(0, 6):
|
|
self.assertEqual(rows[i], self.samples[i],
|
|
'incorrect data retrieved by cursor.fetchmany'
|
|
)
|
|
|
|
rows = cur.fetchmany() # Should return an empty list
|
|
self.assertEqual(len(rows), 0,
|
|
'cursor.fetchmany should return an empty '
|
|
'sequence if called after the whole result '
|
|
'set has been fetched'
|
|
)
|
|
self.assertTrue(cur.rowcount in (-1, 6))
|
|
|
|
self.executeDDL2(cur)
|
|
cur.execute('select name from %sbarflys' % self.table_prefix)
|
|
r = cur.fetchmany() # Should get empty sequence
|
|
self.assertEqual(len(r), 0, "cursor.fetchmany should return an "
|
|
"empty sequence if query retrieved "
|
|
"no rows"
|
|
)
|
|
self.assertTrue(cur.rowcount in (-1, 0))
|
|
|
|
finally:
|
|
con.close()
|
|
|
|
def test_fetchall(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
# cursor.fetchall should raise an Error if called
|
|
# without executing a query that may return rows (such
|
|
# as a select)
|
|
self.assertRaises(self.driver.Error, cur.fetchall)
|
|
|
|
self.executeDDL1(cur)
|
|
for sql in self._populate():
|
|
cur.execute(sql)
|
|
|
|
# cursor.fetchall should raise an Error if called
|
|
# after executing a a statement that cannot return rows
|
|
self.assertRaises(self.driver.Error, cur.fetchall)
|
|
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
rows = cur.fetchall()
|
|
self.assertTrue(cur.rowcount in (-1, len(self.samples)))
|
|
self.assertEqual(len(rows), len(self.samples),
|
|
'cursor.fetchall did not retrieve all rows'
|
|
)
|
|
rows = [r[0] for r in rows]
|
|
rows.sort()
|
|
for i in range(0, len(self.samples)):
|
|
self.assertEqual(rows[i], self.samples[i],
|
|
'cursor.fetchall retrieved incorrect rows'
|
|
)
|
|
rows = cur.fetchall()
|
|
self.assertEqual(
|
|
len(rows), 0,
|
|
'cursor.fetchall should return an empty list if called '
|
|
'after the whole result set has been fetched'
|
|
)
|
|
self.assertTrue(cur.rowcount in (-1, len(self.samples)))
|
|
|
|
self.executeDDL2(cur)
|
|
cur.execute('select name from %sbarflys' % self.table_prefix)
|
|
rows = cur.fetchall()
|
|
self.assertTrue(cur.rowcount in (-1, 0))
|
|
self.assertEqual(len(rows), 0,
|
|
'cursor.fetchall should return an empty list if '
|
|
'a select query returns no rows'
|
|
)
|
|
|
|
finally:
|
|
con.close()
|
|
|
|
def test_mixedfetch(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
self.executeDDL1(cur)
|
|
for sql in self._populate():
|
|
cur.execute(sql)
|
|
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
rows1 = cur.fetchone()
|
|
rows23 = cur.fetchmany(2)
|
|
rows4 = cur.fetchone()
|
|
rows56 = cur.fetchall()
|
|
self.assertTrue(cur.rowcount in (-1, 6))
|
|
self.assertEqual(len(rows23), 2,
|
|
'fetchmany returned incorrect number of rows'
|
|
)
|
|
self.assertEqual(len(rows56), 2,
|
|
'fetchall returned incorrect number of rows'
|
|
)
|
|
|
|
rows = [rows1[0]]
|
|
rows.extend([rows23[0][0], rows23[1][0]])
|
|
rows.append(rows4[0])
|
|
rows.extend([rows56[0][0], rows56[1][0]])
|
|
rows.sort()
|
|
for i in range(0, len(self.samples)):
|
|
self.assertEqual(rows[i], self.samples[i],
|
|
'incorrect data retrieved or inserted'
|
|
)
|
|
finally:
|
|
con.close()
|
|
|
|
def help_nextset_setUp(self, cur):
|
|
''' Should create a procedure called deleteme
|
|
that returns two result sets, first the
|
|
number of rows in booze then "name from booze"
|
|
'''
|
|
raise NotImplementedError('Helper not implemented')
|
|
# sql="""
|
|
# create procedure deleteme as
|
|
# begin
|
|
# select count(*) from booze
|
|
# select name from booze
|
|
# end
|
|
# """
|
|
# cur.execute(sql)
|
|
|
|
def help_nextset_tearDown(self, cur):
|
|
'If cleaning up is needed after nextSetTest'
|
|
raise NotImplementedError('Helper not implemented')
|
|
# cur.execute("drop procedure deleteme")
|
|
|
|
def test_arraysize(self):
|
|
# Not much here - rest of the tests for this are in test_fetchmany
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
self.assertTrue(hasattr(cur, 'arraysize'),
|
|
'cursor.arraysize must be defined'
|
|
)
|
|
finally:
|
|
con.close()
|
|
|
|
def test_setinputsizes(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
cur.setinputsizes((25,))
|
|
self._paraminsert(cur) # Make sure cursor still works
|
|
finally:
|
|
con.close()
|
|
|
|
def test_None(self):
|
|
con = self._connect()
|
|
try:
|
|
cur = con.cursor()
|
|
self.executeDDL1(cur)
|
|
cur.execute('insert into %sbooze values (NULL)'
|
|
% self.table_prefix)
|
|
cur.execute('select name from %sbooze' % self.table_prefix)
|
|
r = cur.fetchall()
|
|
self.assertEqual(len(r), 1)
|
|
self.assertEqual(len(r[0]), 1)
|
|
self.assertEqual(r[0][0], None, 'NULL value not returned as None')
|
|
finally:
|
|
con.close()
|
|
|
|
def test_Date(self):
|
|
d1 = self.driver.Date(2002, 12, 25)
|
|
self.assertTrue(isinstance(d1, datetime.date))
|
|
d2 = self.driver.DateFromTicks(time.mktime((2002, 12, 25,
|
|
0, 0, 0, 0, 0, 0)))
|
|
# Can we assume this? API doesn't specify, but it seems implied
|
|
self.assertEqual(str(d1), str(d2))
|
|
|
|
def test_Time(self):
|
|
t1 = self.driver.Time(13, 45, 30)
|
|
self.assertTrue(isinstance(t1, datetime.time))
|
|
t2 = self.driver.TimeFromTicks(time.mktime((2001, 1, 1, 13, 45,
|
|
30, 0, 0, 0)))
|
|
# Can we assume this? API doesn't specify, but it seems implied
|
|
self.assertTrue(isinstance(t2, datetime.time))
|
|
self.assertEqual(str(t1), str(t2))
|
|
|
|
def test_Timestamp(self):
|
|
t1 = self.driver.Timestamp(2002, 12, 25, 13, 45, 30)
|
|
self.assertTrue(isinstance(t1, datetime.datetime))
|
|
t2 = self.driver.TimestampFromTicks(
|
|
time.mktime((2002, 12, 25, 13, 45, 30, 0, 0, 0))
|
|
)
|
|
self.assertTrue(isinstance(t2, datetime.datetime))
|
|
# Can we assume this? API doesn't specify, but it seems implied
|
|
# self.assertEqual(str(t1),str(t2))
|
|
|
|
def test_Binary(self):
|
|
b = self.driver.Binary(b'Something')
|
|
self.assertTrue(isinstance(b, bytes))
|
|
b = self.driver.Binary(b'')
|
|
self.assertTrue(isinstance(b, bytes))
|
|
|
|
def test_STRING(self):
|
|
self.assertTrue(hasattr(self.driver, 'STRING'),
|
|
'module.STRING must be defined'
|
|
)
|
|
|
|
def test_BINARY(self):
|
|
self.assertTrue(hasattr(self.driver, 'BINARY'),
|
|
'module.BINARY must be defined.'
|
|
)
|
|
|
|
def test_NUMBER(self):
|
|
self.assertTrue(hasattr(self.driver, 'NUMBER'),
|
|
'module.NUMBER must be defined.'
|
|
)
|
|
|
|
def test_DATETIME(self):
|
|
self.assertTrue(hasattr(self.driver, 'DATETIME'),
|
|
'module.DATETIME must be defined.'
|
|
)
|
|
|
|
def test_ROWID(self):
|
|
self.assertTrue(hasattr(self.driver, 'ROWID'),
|
|
'module.ROWID must be defined.'
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|