Files
postgres-web/tools/auth_changetrack/auth_changetrack.py
Magnus Hagander c1fb5de080 Implement synchronization for community authentication
This adds the concept of an apiurl to each site that uses community
authentication, that the main website server can make calls to and send
updates. This URL will receive POSTs from the main website when a user
account that has been used on this site gets updated, and can then
optionally update it's local entries with it (the django plugin sample
is updated to handle this fully).

Updates are only sent for users that have a history of having logged
into the specific site -- this way we avoid braodcasting user
information to sites requiring specific constent that the user hasn't
given, and also decreases the amount of updates that have to be sent.

Updates are queued by the system in a table and using listen/notify a
daemon that's running picks up what needs to be updated and posts it to
the endpoints. If this daemon is not running, obviously nothing gets
sent.

Updates are tracked using triggers in the database which push
information into this queue.
2020-08-11 11:33:46 +02:00

128 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
#
# auth_changetrack.py - tracks changes to users and distributes them
#
import sys
import select
import requests
import json
import base64
import hmac
import logging
import psycopg2
import psycopg2.extensions
def process_queue(conn):
site_stoplist = []
curs = conn.cursor()
while True:
# Fetch data for one site at a time, by just picking whatever happens to be the oldest one
curs.execute("SELECT site_id, apiurl, cryptkey, push_ssh FROM (SELECT site_id FROM account_communityauthchangelog WHERE NOT site_id=ANY(%(stoplist)s) LIMIT 1) x INNER JOIN account_communityauthsite s ON s.id=x.site_id", {
'stoplist': site_stoplist,
})
if not curs.rowcount:
# Nothing in the queue, so we're done here.
conn.rollback()
return
siteid, url, cryptkey, include_ssh = curs.fetchone()
# Get all data for this site (well, up to 100 users to not generate packages that are too big... We'll come back for the rest later if there are more.
curs.execute(
"""SELECT cl.user_id, changedat, username, first_name, last_name, u.email, sshkey, array_agg(se.email) FILTER (WHERE se.confirmed AND se.email IS NOT NULL)
FROM account_communityauthchangelog cl
INNER JOIN auth_user u ON u.id=cl.user_id
LEFT JOIN account_secondaryemail se ON se.user_id=cl.user_id
LEFT JOIN core_userprofile up ON up.user_id=cl.user_id
WHERE cl.site_id=%(siteid)s
GROUP BY cl.user_id, cl.changedat, u.id, up.user_id
LIMIT 100""",
{
'siteid': siteid,
}
)
rows = curs.fetchall()
if not rows:
# This shouldn't happen
logging.error("Re-querying for updates returned no rows! Aborting.")
conn.rollback()
return
# Build the update structure
def _get_userid_struct(row):
yield 'username', row[2]
yield 'firstname', row[3]
yield 'lastname', row[4]
yield 'email', row[5]
yield 'secondaryemails', row[7] or []
if include_ssh:
yield 'sshkeys', row[6]
pushstruct = {
'type': 'update',
'users': [dict(_get_userid_struct(row)) for row in rows],
}
pushjson = json.dumps(pushstruct)
# We don't need to encrypt since it's over https, but we need to sign.
h = hmac.digest(
base64.b64decode(cryptkey),
msg=bytes(pushjson, 'utf-8'),
digest='sha512',
)
try:
r = requests.post(url, data=pushjson, headers={
'X-pgauth-sig': base64.b64encode(h),
}, timeout=10)
except Exception as e:
logging.error("Exception pushing changes to {}: {}".format(url, e))
conn.rollback()
site_stoplist.append(siteid)
continue
if r.status_code == 200:
# Success! Whee!
# This is a really silly way to do it, but meh.
curs.executemany("DELETE FROM account_communityauthchangelog WHERE site_id=%(siteid)s AND user_id=%(userid)s AND changedat=%(changedat)s", [
{
'siteid': siteid,
'userid': row[0],
'changedat': row[1],
} for row in rows]
)
logging.info("Successfully pushed {} changes to {}".format(len(rows), url))
conn.commit()
continue
logging.error("Failed to push changes to {}: status {}, initial: {}".format(url, r.status_code, r.text[:100]))
conn.rollback()
site_stoplist.append(siteid)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: auth_changetrack.py <dsn>")
sys.exit(1)
logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', level=logging.INFO)
conn = psycopg2.connect(sys.argv[1])
curs = conn.cursor()
curs.execute("LISTEN communityauth_changetrack")
conn.commit()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
while True:
process_queue(conn)
select.select([conn], [], [], 5 * 60)
conn.poll()
while conn.notifies:
conn.notifies.pop()
# Loop back up and process the full queue