Implement authenticated encryption in community auth

This creates a community auth version 3 (previous one being 2, and 1 is
long gone) trhat uses AES_SIV as the encryption method instead of
regular AES_CBC, and validates the digests on all accounts.

As this gets deployed on servers incrementall, the version has to be
specified in the database record for the site. We could have the site
indicate this itself, but doing it this way seems safer as it will then
just break for any app that accidentally reverts the plugin.

Reviewed by Jacob Champion
This commit is contained in:
Magnus Hagander
2025-04-01 13:41:08 +02:00
parent 7a42e2a5f5
commit de76f82f62
8 changed files with 147 additions and 74 deletions

View File

@ -62,26 +62,26 @@ The flow of an authentication in the 2.0 system is fairly simple:
#. This dictionary of information is then URL-encoded.
#. The resulting URL-encoded string is padded with spaces to an even
16 bytes, and is then AES encrypted with a shared key. This key
is stored in the main website system and indexed by the site id,
and it is stored in the settings of the community website somewhere.
Since this key is what protects the authentication, it should be
treated as very valuable.
#. The resulting encrypted string and the IV used for the encryption are
base64-encoded (in URL mode, meaning it uses - and _ instead of + and /.
16 bytes, and is then AES-SIV encrypted with a shared key and a 16
byte nonce. This key is stored in the main website system and
indexed by the site id, and it is stored in the settings of the
community website somewhere. Since this key is what protects the
authentication, it should be treated as very valuable.
#. The resulting encrypted string, the nonce used for the encryption
and the tag from the digest are base64-encoded (in URL mode,
meaning it uses - and _ instead of + and /.
#. The main website looks up the redirection URL registered for this site
(again indexed by the site id), and constructs an URL of the format
<redirection_url>?i=<iv>&d=<encrypted data>
<redirection_url>?n=<nonce>&d=<encrypted data>&t=<tag>
#. The user browser is redirected to this URL.
#. The community website detects that this is a redirected authentication
response, and starts processing it specifically.
#. Using the shared key, the data is decrypted (while first being base64
decoded, of course)
decoded, of course). Since authenticated encryption using AES-SIV
is used, this step will fail if there has been any tampering with the
data.
#. The resulting string is urldecoded - and if any errors occur in the
decoding, the authentication will fail. This step is guaranteed to fail
if the encryption key is mismatching between the community site and
the main website, since it is going to end up with something that is
definitely not an url-decodeable string.
decoding, the authentication will fail.
#. The community site will look up an existing user record under this
username, or create a new one if one does not exist already (assuming
the site keeps local track of users at all - if it just deals with
@ -94,10 +94,6 @@ The flow of an authentication in the 2.0 system is fairly simple:
#. If the *d* key is present in the data structure handed over, the
community site implements a site-specific action based on this data,
such as redirecting the user to the original location.
#. *DEPRECATED* If the *su* key is present in the data structure handed over, the
community site redirects to this location. If it's not present, then
the community site will redirect so some default location on the
site.
Logging out
-----------

View File

@ -27,8 +27,8 @@ class CommunityAuthSiteAdminForm(forms.ModelForm):
except Exception:
raise forms.ValidationError("Crypto key must be base64 encoded")
if (len(x) != 16 and len(x) != 24 and len(x) != 32):
raise forms.ValidationError("Crypto key must be 16, 24 or 32 bytes before being base64-encoded")
if (len(x) != 16 and len(x) != 24 and len(x) != 32 and len(x) != 64):
raise forms.ValidationError("Crypto key must be 16, 24, 32 or 64 bytes before being base64-encoded")
return self.cleaned_data['cryptkey']
def clean(self):

View File

@ -0,0 +1,18 @@
# Generated by Django 4.2.11 on 2025-04-01 10:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('account', '0009_cauth_unique_names'),
]
operations = [
migrations.AddField(
model_name='communityauthsite',
name='version',
field=models.IntegerField(choices=[(2, 2), (3, 3)], default=2),
),
]

View File

@ -18,6 +18,7 @@ class CommunityAuthSite(models.Model):
apiurl = models.URLField(max_length=200, null=False, blank=True)
cryptkey = models.CharField(max_length=100, null=False, blank=False,
help_text="Use tools/communityauth/generate_cryptkey.py to create a key")
version = models.IntegerField(choices=((2, 2), (3, 3)), default=2)
comment = models.TextField(null=False, blank=True)
org = models.ForeignKey(CommunityAuthOrg, null=False, blank=False, on_delete=models.CASCADE)
cooloff_hours = models.PositiveIntegerField(null=False, blank=False, default=0,

View File

@ -722,17 +722,33 @@ def communityauth(request, siteid):
# the first block more random..
s = "t=%s&%s" % (int(time.time()), urllib.parse.urlencode(info))
# Encrypt it with the shared key (and IV!)
r = Random.new()
iv = r.read(16) # Always 16 bytes for AES
encryptor = AES.new(base64.b64decode(site.cryptkey), AES.MODE_CBC, iv)
cipher = encryptor.encrypt(s.encode('ascii') + b' ' * (16 - (len(s) % 16))) # Pad to even 16 bytes
if site.version == 3:
# v3 = authenticated encryption
r = Random.new()
nonce = r.read(16)
encryptor = AES.new(base64.b64decode(site.cryptkey), AES.MODE_SIV, nonce=nonce)
cipher, tag = encryptor.encrypt_and_digest(s.encode('ascii'))
redirparams = {
'd': base64.b64encode(cipher, b"-_").decode('ascii'),
'n': base64.b64encode(nonce, b"-_").decode('ascii'),
't': base64.b64encode(tag, b"-_").decode('ascii'),
}
else:
# v2 = plain AES
# Encrypt it with the shared key (and IV!)
r = Random.new()
iv = r.read(16) # Always 16 bytes for AES
encryptor = AES.new(base64.b64decode(site.cryptkey), AES.MODE_CBC, iv)
cipher = encryptor.encrypt(s.encode('ascii') + b' ' * (16 - (len(s) % 16))) # Pad to even 16 bytes
redirparams = {
'i': base64.b64encode(iv, b"-_").decode('ascii'),
'd': base64.b64encode(cipher, b"-_").decode('ascii'),
}
# Generate redirect
return HttpResponseRedirect("%s?i=%s&d=%s" % (
return HttpResponseRedirect("%s?%s" % (
site.redirecturl,
base64.b64encode(iv, b"-_").decode('ascii'),
base64.b64encode(cipher, b"-_").decode('ascii'),
urllib.parse.urlencode(redirparams),
))
@ -769,18 +785,30 @@ def communityauth_consent(request, siteid):
})
def _encrypt_site_response(site, s):
# Encrypt it with the shared key (and IV!)
r = Random.new()
iv = r.read(16) # Always 16 bytes for AES
encryptor = AES.new(base64.b64decode(site.cryptkey), AES.MODE_CBC, iv)
cipher = encryptor.encrypt(s.encode('ascii') + b' ' * (16 - (len(s) % 16))) # Pad to even 16 bytes
def _encrypt_site_response(site, s, version):
if version == 3:
# Use authenticated encryption
r = Random.new()
nonce = r.read(16)
encryptor = AES.new(base64.b64decode(site.cryptkey), AES.MODE_SIV, nonce=nonce)
cipher, tag = encryptor.encrypt_and_digest(s.encode('ascii'))
# Base64-encode the response, just to be consistent
return "%s&%s" % (
base64.b64encode(iv, b'-_').decode('ascii'),
base64.b64encode(cipher, b'-_').decode('ascii'),
)
return "&".join((
base64.b64encode(nonce, b'-_').decode('ascii'),
base64.b64encode(cipher, b'-_').decode('ascii'),
base64.b64encode(tag, b'-_').decode('ascii'),
))
else:
# Encrypt it with the shared key (and IVs)
r = Random.new()
iv = r.read(16) # Always 16 bytes for AES
encryptor = AES.new(base64.b64decode(site.cryptkey), AES.MODE_CBC, iv)
cipher = encryptor.encrypt(s.encode('ascii') + b' ' * (16 - (len(s) % 16))) # Pad to even 16 bytes
return "&".join((
base64.b64encode(iv, b'-_').decode('ascii'),
base64.b64encode(cipher, b'-_').decode('ascii'),
))
@queryparams('s', 'e', 'n', 'u')
@ -812,7 +840,7 @@ def communityauth_search(request, siteid):
'se': [a.email for a in u.secondaryemail_set.all()],
} for u in users])
return HttpResponse(_encrypt_site_response(site, j))
return HttpResponse(_encrypt_site_response(site, j, site.version))
def communityauth_getkeys(request, siteid, since=None):
@ -828,7 +856,7 @@ def communityauth_getkeys(request, siteid, since=None):
j = json.dumps([{'u': k.user.username, 's': k.sshkey.replace("\r", "\n")} for k in keys])
return HttpResponse(_encrypt_site_response(site, j))
return HttpResponse(_encrypt_site_response(site, j, site.version))
@csrf_exempt

View File

@ -9,11 +9,11 @@ from Cryptodome import Random
import base64
if __name__ == "__main__":
print("The next row contains a 32-byte (256-bit) symmetric crypto key.")
print("The next row contains a 64-byte (512-bit) symmetric crypto key.")
print("This key should be used to integrate a community auth site.")
print("Note that each site should have it's own key!!")
print("")
r = Random.new()
key = r.read(32)
key = r.read(64)
print(base64.b64encode(key).decode('ascii'))

View File

@ -41,7 +41,7 @@ import hmac
from urllib.parse import urlencode, parse_qs
import requests
from Cryptodome.Cipher import AES
from Cryptodome.Hash import SHA
from Cryptodome.Hash import SHA256
from Cryptodome import Random
import time
@ -75,15 +75,19 @@ def login(request):
s = "t=%s&%s" % (int(time.time()), urlencode({'r': request.GET['next']}))
# Now encrypt it
r = Random.new()
iv = r.read(16)
encryptor = AES.new(SHA.new(settings.SECRET_KEY.encode('ascii')).digest()[:16], AES.MODE_CBC, iv)
cipher = encryptor.encrypt(s.encode('ascii') + b' ' * (16 - (len(s) % 16))) # pad to 16 bytes
nonce = r.read(16)
encryptor = AES.new(
SHA256.new(settings.SECRET_KEY.encode('ascii')).digest()[:32], AES.MODE_SIV, nonce=nonce
)
cipher, tag = encryptor.encrypt_and_digest(s.encode('ascii'))
return HttpResponseRedirect("%s?d=%s$%s" % (
settings.PGAUTH_REDIRECT,
base64.b64encode(iv, b"-_").decode('utf8'),
base64.b64encode(cipher, b"-_").decode('utf8'),
))
return HttpResponseRedirect("%s?%s" % (settings.PGAUTH_REDIRECT, urlencode({
'd': '$'.join((
base64.b64encode(nonce, b"-_").decode('utf8'),
base64.b64encode(cipher, b"-_").decode('utf8'),
base64.b64encode(tag, b"-_").decode('utf8'),
)),
})))
else:
return HttpResponseRedirect(settings.PGAUTH_REDIRECT)
@ -103,17 +107,24 @@ def auth_receive(request):
# This was a logout request
return HttpResponseRedirect('/')
if 'i' not in request.GET:
return HttpResponse("Missing IV in url!", status=400)
if 'n' not in request.GET:
return HttpResponse("Missing nonce in url!", status=400)
if 'd' not in request.GET:
return HttpResponse("Missing data in url!", status=400)
if 't' not in request.GET:
return HttpResponse("Missing tag in url!", status=400)
# Set up an AES object and decrypt the data we received
try:
decryptor = AES.new(base64.b64decode(settings.PGAUTH_KEY),
AES.MODE_CBC,
base64.b64decode(str(request.GET['i']), "-_"))
s = decryptor.decrypt(base64.b64decode(str(request.GET['d']), "-_")).rstrip(b' ').decode('utf8')
decryptor = AES.new(
base64.b64decode(settings.PGAUTH_KEY),
AES.MODE_SIV,
nonce=base64.b64decode(str(request.GET['n']), "-_"),
)
s = decryptor.decrypt_and_verify(
base64.b64decode(str(request.GET['d']), "-_"),
base64.b64decode(str(request.GET['t']), "-_"),
).rstrip(b' ').decode('utf8')
except UnicodeDecodeError:
return HttpResponse("Badly encoded data found", 400)
except Exception:
@ -200,11 +211,16 @@ We apologize for the inconvenience.
# Finally, check of we have a data package that tells us where to
# redirect the user.
if 'd' in data:
(ivs, datas) = data['d'][0].split('$')
decryptor = AES.new(SHA.new(settings.SECRET_KEY.encode('ascii')).digest()[:16],
AES.MODE_CBC,
base64.b64decode(ivs, b"-_"))
s = decryptor.decrypt(base64.b64decode(datas, "-_")).rstrip(b' ').decode('utf8')
(nonces, datas, tags) = data['d'][0].split('$')
decryptor = AES.new(
SHA256.new(settings.SECRET_KEY.encode('ascii')).digest()[:32],
AES.MODE_SIV,
nonce=base64.b64decode(nonces, b"-_"),
)
s = decryptor.decrypt_and_verify(
base64.b64decode(datas, "-_"),
base64.b64decode(tags, "-_"),
).rstrip(b' ').decode('utf8')
try:
rdata = parse_qs(s, strict_parsing=True)
except ValueError:
@ -304,17 +320,24 @@ def user_search(searchterm=None, userid=None):
r = requests.get(
'{0}search/'.format(settings.PGAUTH_REDIRECT),
params=q,
timeout=10,
)
if r.status_code != 200:
return []
(ivs, datas) = r.text.encode('utf8').split(b'&')
(nonces, datas, tags) = r.text.encode('utf8').split(b'&')
# Decryption time
decryptor = AES.new(base64.b64decode(settings.PGAUTH_KEY),
AES.MODE_CBC,
base64.b64decode(ivs, "-_"))
s = decryptor.decrypt(base64.b64decode(datas, "-_")).rstrip(b' ').decode('utf8')
decryptor = AES.new(
base64.b64decode(settings.PGAUTH_KEY),
AES.MODE_SIV,
nonce=base64.b64decode(nonces, "-_")
)
s = decryptor.decrypt_and_verify(
base64.b64decode(datas, "-_"),
base64.b64decode(tags, "-_"),
).rstrip(b' ').decode('utf8')
j = json.loads(s)
return j

View File

@ -54,12 +54,19 @@ if __name__ == "__main__":
s = "t=%s&%s" % (int(time.time() + 300), urllib.parse.urlencode(info))
r = Random.new()
iv = r.read(16)
encryptor = AES.new(base64.b64decode(options.key), AES.MODE_CBC, iv)
cipher = encryptor.encrypt(s.encode('ascii') + b' ' * (16 - (len(s) % 16)))
nonce = r.read(16)
encryptor = AES.new(
base64.b64decode(options.key),
AES.MODE_SIV,
nonce=nonce,
)
cipher, tag = encryptor.encrypt_and_digest(s.encode('ascii'))
redirparams = {
'd': base64.b64encode(cipher, b"-_").decode('ascii'),
'n': base64.b64encode(nonce, b"-_").decode('ascii'),
't': base64.b64encode(tag, b"-_").decode('ascii'),
}
print("Paste the following after the receiving url:")
print("?i=%s&d=%s" % (
base64.b64encode(iv, b"-_").decode('ascii'),
base64.b64encode(cipher, b"-_").decode('ascii'),
))
print("?" + urllib.parse.urlencode(redirparams))