Simplify transaction handling in the changetracker daemon

Previously we used a combination of optimistic concurrency control
(by DELETEing with both the id and the date included in the WHERE
clause) and REPEATABLE READ transactions. This would create
serialization conflicts when completely unnecessary. Since in this case
it doesn't matter if we happen to push the same thing twice, switch
completely to optimistic concurrency control. That gets rid of having to
deal with serialization issues.
This commit is contained in:
Magnus Hagander
2020-08-12 17:17:32 +02:00
parent 1ffc1f3d6d
commit 0355560517

View File

@ -25,7 +25,6 @@ def process_queue(conn):
})
if not curs.rowcount:
# Nothing in the queue, so we're done here.
conn.rollback()
return
siteid, url, cryptkey, include_ssh = curs.fetchone()
@ -48,7 +47,6 @@ LIMIT 100""",
if not rows:
# This shouldn't happen
logging.error("Re-querying for updates returned no rows! Aborting.")
conn.rollback()
return
# Build the update structure
@ -80,13 +78,14 @@ LIMIT 100""",
}, timeout=10)
except Exception as e:
logging.error("Exception pushing changes to {}: {}".format(url, e))
conn.rollback()
site_stoplist.append(siteid)
continue
if r.status_code == 200:
# Success! Whee!
# This is a really silly way to do it, but meh.
# Also psycopg2 really doesn't like mixing transaction modes, but here we go..
conn.autocommit = False
curs.executemany("DELETE FROM account_communityauthchangelog WHERE site_id=%(siteid)s AND user_id=%(userid)s AND changedat=%(changedat)s", [
{
'siteid': siteid,
@ -96,10 +95,10 @@ LIMIT 100""",
)
logging.info("Successfully pushed {} changes to {}".format(len(rows), url))
conn.commit()
conn.autocommit = True
continue
logging.error("Failed to push changes to {}: status {}, initial: {}".format(url, r.status_code, r.text[:100]))
conn.rollback()
site_stoplist.append(siteid)
@ -112,10 +111,11 @@ if __name__ == "__main__":
conn = psycopg2.connect(sys.argv[1])
curs = conn.cursor()
curs.execute("LISTEN communityauth_changetrack")
conn.commit()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
conn.autocommit = True
curs.execute("LISTEN communityauth_changetrack")
while True:
process_queue(conn)