From 035556051716a78688d043583a7beff9dfadd64c Mon Sep 17 00:00:00 2001 From: Magnus Hagander Date: Wed, 12 Aug 2020 17:17:32 +0200 Subject: [PATCH] Simplify transaction handling in the changetracker daemon Previously we used a combination of optimistic concurrency control (by DELETEing with both the id and the date included in the WHERE clause) and REPEATABLE READ transactions. This would create serialization conflicts when completely unnecessary. Since in this case it doesn't matter if we happen to push the same thing twice, switch completely to optimistic concurrency control. That gets rid of having to deal with serialization issues. --- tools/auth_changetrack/auth_changetrack.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tools/auth_changetrack/auth_changetrack.py b/tools/auth_changetrack/auth_changetrack.py index 916127d3..58be15f1 100755 --- a/tools/auth_changetrack/auth_changetrack.py +++ b/tools/auth_changetrack/auth_changetrack.py @@ -25,7 +25,6 @@ def process_queue(conn): }) if not curs.rowcount: # Nothing in the queue, so we're done here. - conn.rollback() return siteid, url, cryptkey, include_ssh = curs.fetchone() @@ -48,7 +47,6 @@ LIMIT 100""", if not rows: # This shouldn't happen logging.error("Re-querying for updates returned no rows! Aborting.") - conn.rollback() return # Build the update structure @@ -80,13 +78,14 @@ LIMIT 100""", }, timeout=10) except Exception as e: logging.error("Exception pushing changes to {}: {}".format(url, e)) - conn.rollback() site_stoplist.append(siteid) continue if r.status_code == 200: # Success! Whee! # This is a really silly way to do it, but meh. + # Also psycopg2 really doesn't like mixing transaction modes, but here we go.. + conn.autocommit = False curs.executemany("DELETE FROM account_communityauthchangelog WHERE site_id=%(siteid)s AND user_id=%(userid)s AND changedat=%(changedat)s", [ { 'siteid': siteid, @@ -96,10 +95,10 @@ LIMIT 100""", ) logging.info("Successfully pushed {} changes to {}".format(len(rows), url)) conn.commit() + conn.autocommit = True continue logging.error("Failed to push changes to {}: status {}, initial: {}".format(url, r.status_code, r.text[:100])) - conn.rollback() site_stoplist.append(siteid) @@ -112,10 +111,11 @@ if __name__ == "__main__": conn = psycopg2.connect(sys.argv[1]) curs = conn.cursor() - curs.execute("LISTEN communityauth_changetrack") - conn.commit() conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) + conn.autocommit = True + + curs.execute("LISTEN communityauth_changetrack") while True: process_queue(conn)