mirror of
https://github.com/postgres/pgweb.git
synced 2025-08-06 09:57:57 +00:00

Raising an exception triggers an email-to-admin-action, and the whole reason we have this NUL check is to *avoid* triggering those emails... Hopefully explicitly returning a 400 HttpResponse will maek them go away.
116 lines
4.5 KiB
Python
116 lines
4.5 KiB
Python
from django.conf import settings
|
|
from django.http import QueryDict, HttpResponse
|
|
|
|
from pgweb.util.templateloader import initialize_template_collection, get_all_templates
|
|
|
|
from collections import OrderedDict
|
|
import hashlib
|
|
|
|
# Use thread local storage to pass the username down.
|
|
# http://code.djangoproject.com/wiki/CookBookThreadlocalsAndUser
|
|
try:
|
|
from threading import local
|
|
except ImportError:
|
|
from django.utils._threading_local import local
|
|
|
|
_thread_locals = local()
|
|
|
|
|
|
def get_current_user():
|
|
return getattr(_thread_locals, 'user', None)
|
|
|
|
|
|
# General middleware for all middleware functionality specific to the pgweb
|
|
# project.
|
|
class PgMiddleware(object):
|
|
def __init__(self, get_response):
|
|
self.get_response = get_response
|
|
|
|
def __call__(self, request):
|
|
# Thread local store for username, see comment at the top of this file
|
|
_thread_locals.user = getattr(request, 'user', None)
|
|
initialize_template_collection()
|
|
|
|
# Call the view
|
|
response = self.get_response(request)
|
|
|
|
# Set xkey representing the templates that are in use so we can do efficient
|
|
# varnish purging on commits.
|
|
tlist = get_all_templates()
|
|
if 'base/esi.html' in tlist:
|
|
response['x-do-esi'] = "1"
|
|
tlist.remove('base/esi.html')
|
|
if tlist:
|
|
response['xkey'] = ' '.join(["pgwt_{0}".format(hashlib.md5(t.encode('ascii', errors='replace')).hexdigest()) for t in tlist] + [response.get('xkey', '')])
|
|
|
|
# Set security headers
|
|
sources = OrderedDict([
|
|
('default', ["'self'", ]),
|
|
('img', ['*', 'data:', ]),
|
|
('script', ["'unsafe-eval'", "'self'", "www.google-analytics.com", "ssl.google-analytics.com", "www.googletagmanager.com", "tagmanager.google.com", "data:"]),
|
|
('connect', ["'self'", "www.google-analytics.com", "ssl.google-analytics.com"]),
|
|
('media', ["'self'", ]),
|
|
('style', ["'self'", "fonts.googleapis.com", "tagmanager.google.com"]),
|
|
('font', ["'self'", "fonts.gstatic.com", "data:", ]),
|
|
])
|
|
if hasattr(response, 'x_allow_extra_sources'):
|
|
for k, v in list(response.x_allow_extra_sources.items()):
|
|
if k in sources:
|
|
sources[k].extend(v)
|
|
else:
|
|
sources[k] = v
|
|
|
|
security_policies = ["{0}-src {1}".format(k, " ".join(v)) for k, v in list(sources.items())]
|
|
|
|
if not getattr(response, 'x_allow_frames', False):
|
|
response['X-Frame-Options'] = 'DENY'
|
|
security_policies.append("frame-ancestors 'none'")
|
|
|
|
if hasattr(settings, 'SECURITY_POLICY_REPORT_URI'):
|
|
security_policies.append("report-uri " + settings.SECURITY_POLICY_REPORT_URI)
|
|
|
|
if security_policies:
|
|
if getattr(settings, 'SECURITY_POLICY_REPORT_ONLY', False):
|
|
response['Content-Security-Policy-Report-Only'] = " ; ".join(security_policies)
|
|
else:
|
|
response['Content-Security-Policy'] = " ; ".join(security_policies)
|
|
|
|
response['X-XSS-Protection'] = "1; mode=block"
|
|
return response
|
|
|
|
def process_view(self, request, view_func, view_args, view_kwargs):
|
|
# Filter out any query parameters that are not explicitly allowed. We do the same thing in Varnish,
|
|
# and it's better to also do it in django if they show up here, so issues because of it are caught
|
|
# in local testing where there is no Varnish.
|
|
if not request.GET:
|
|
# If there are no parameters, just skip this whole process
|
|
return None
|
|
|
|
if request.path.startswith('/admin/'):
|
|
# django-admin uses it a lot and it's not for us to change
|
|
return None
|
|
|
|
if settings.DEBUG_TOOLBAR and request.path.startswith('/__debug__/'):
|
|
# The debug toolbar also uses a lot
|
|
return None
|
|
|
|
allowed = getattr(view_func, 'queryparams', None)
|
|
|
|
if allowed:
|
|
# Filter the QueryDict for only the allowed parameters
|
|
result = request.GET.copy()
|
|
for k in request.GET.keys():
|
|
if k not in allowed:
|
|
del result[k]
|
|
if "\0" in request.GET[k]:
|
|
return HttpResponse(
|
|
"NUL escapes not allowed in query parameters",
|
|
content_type='text/plain',
|
|
status=400
|
|
)
|
|
result.mutable = False
|
|
request.GET = result
|
|
else:
|
|
request.GET = QueryDict()
|
|
return None
|