mirror of
https://github.com/mariadb-corporation/mariadb-connector-python.git
synced 2025-08-15 23:42:40 +00:00
Improve cursor behavior on closed connections
- Added internal attribute cursor._thread_id to detect automatic reconnect. - If a reconnect occurred, only buffered resultsets using the text protocol can be accessed. (The same applies if a connection was closed). - If connection reconnected, _thread_id attribute will be updated when using execute() or executemany() method.
This commit is contained in:
@ -8,6 +8,7 @@ import decimal
|
||||
import json
|
||||
from decimal import Decimal
|
||||
import array
|
||||
import time
|
||||
|
||||
import mariadb
|
||||
from mariadb.constants import FIELD_TYPE, EXT_FIELD_TYPE, ERR, CURSOR, INDICATOR, CLIENT
|
||||
@ -41,6 +42,51 @@ class TestCursor(unittest.TestCase):
|
||||
cursor.close()
|
||||
del cursor
|
||||
|
||||
def test_cursor_reconnect(self):
|
||||
conn= create_connection({'reconnect' : True})
|
||||
self.assertEqual(conn.auto_reconnect, True)
|
||||
cursor= conn.cursor(binary=True)
|
||||
cursor.execute("SET session wait_timeout=3")
|
||||
|
||||
# binary protocol should fail
|
||||
cursor.execute("SELECT 1 UNION SELECT 2 UNION SELECT 3")
|
||||
time.sleep(5)
|
||||
try:
|
||||
cursor.fetchone()
|
||||
except mariadb.ProgrammingError:
|
||||
pass
|
||||
|
||||
cursor.close()
|
||||
|
||||
# Text protocol unbuffered should fail
|
||||
cursor= conn.cursor(binary=False, buffered=False)
|
||||
cursor.execute("SET session wait_timeout=3")
|
||||
|
||||
# text protocol unbuffered should fail
|
||||
cursor.execute("SELECT 1 UNION SELECT 2 UNION SELECT 3")
|
||||
time.sleep(5)
|
||||
try:
|
||||
cursor.fetchone()
|
||||
except mariadb.ProgrammingError:
|
||||
pass
|
||||
|
||||
# reeusing cursor should work
|
||||
cursor= conn.cursor(binary=False, buffered=True)
|
||||
cursor.execute("SET session wait_timeout=3")
|
||||
time.sleep(5)
|
||||
# reconnect
|
||||
cursor.execute("SELECT 1 UNION SELECT 2 UNION SELECT 3")
|
||||
self.assertNotEqual(cursor._thread_id, cursor.connection.thread_id)
|
||||
row= cursor.fetchone()
|
||||
self.assertEqual(row[0],1)
|
||||
# execute should update cursor._thread_id
|
||||
cursor.execute("SELECT 1 UNION SELECT 2 UNION SELECT 3")
|
||||
self.assertEqual(cursor._thread_id, cursor.connection.thread_id)
|
||||
|
||||
cursor.close()
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_conpy283(self):
|
||||
conn= create_connection()
|
||||
cursor= conn.cursor(dictionary=True)
|
||||
|
Reference in New Issue
Block a user