Files
openstreetmap-mod_tile-pyth…/renderd.py

559 lines
19 KiB
Python
Executable File

#!/usr/bin/python
#
# mod_tile rendering daemon example written in Python.
# The code is mostly a direct port of the C implementation.
# It needs more work to make it more Pythonic, split it
# into more appropriate classes, add documentation, fix bugs etc.
#
# This is currently experimental and not intended as a replacement
# of the C implementation! It should allow more people to produce
# custom variations of the rendering pipeline, e.g. such as compositing
# tiles from multiple layers.
#
# The code functions but I'm not yet convinced this is the correct
# approach to integrating Python with the render daemon. Two other
# options I'm considering:
#
# - Use the C renderd code with python binding allowing the replacement
# of just the core tile rendering code (this is the bit that people
# may want to tweak)
#
# - Split the functionality into a seperate queue handler daemon and
# render daemon. This would remove a lot of the complexity around the
# request handling which most people probably won't want to touch.
# The queue handler might stay in C with a smaller python rendering daemon
import sys, os
import SocketServer
import struct
import thread
import threading
import socket
import ConfigParser
import mapnik
import time
import errno
from math import pi,cos,sin,log,exp,atan
MAX_ZOOM = 18
METATILE = 8
META_MAGIC = "META"
class protocol:
# ENUM values for commandStatus field in protocol packet
Ignore = 0
Render = 1
Dirty = 2
Done = 3
NotDone = 4
class ProtocolPacket:
def __init__(self, version, fields = ""):
self.version = version
self.xmlname = ""
self.x = 0
self.y = 0
self.z = 0
self.mx = 0
self.my = 0
self.commandStatus = protocol.Ignore
self.fields = fields
def len(self):
return struct.calcsize(self.fields)
def bad_request(self):
x = self.x
y = self.y
z = self.z
if (z < 0) or (z > MAX_ZOOM):
return True
limit = (1 << z) -1
if (x < 0) or (x > limit):
return True
if (y < 0) or (y > limit):
return True
return False
def meta_tuple(self):
# This metatile tuple is used to identify duplicate request in the rendering queue
return (self.xmlname, self.mx, self.my, self.z)
class ProtocolPacketV1(ProtocolPacket):
def __init__(self):
ProtocolPacket(1)
self.fields = "5i"
def receive(self, data, dest):
version, request, x, y, z = struct.unpack(self.fields, data)
if version != 1:
print "Received V1 packet with incorect version %d" % version
else:
#print "Got V1 request, command(%d), x(%d), y(%d), z(%d)" \
# % (request, x, y, z)
self.commandStatus = request
self.x = x
self.y = y
self.z = z
self.xmlname = "Default"
# Calculate Meta-tile value for this x/y
self.mx = x & (METATILE-1)
self.my = y & (METATILE-1)
self.dest = dest
def send(self, status):
x = self.x
y = self.y
z = self.z
data = struct.pack(self.fields, (1, status, x, y, z))
try:
self.dest.send(data)
except socket.error, e:
if e[0] == errno.EBADF:
print "Got EBADF in socket send"
else:
raise
class ProtocolPacketV2(ProtocolPacket):
def __init__(self):
ProtocolPacket(2)
self.fields = "5i41sxxx"
def receive(self, data, dest):
version, request, x, y, z, xmlname = struct.unpack(self.fields, data)
if version != 2:
print "Received V2 packet with incorect version %d" % version
else:
#print "Got V2 request, command(%d), xmlname(%s), x(%d), y(%d), z(%d)" \
# % (request, xmlname, x, y, z)
self.commandStatus = request
self.x = x
self.y = y
self.z = z
self.xmlname = xmlname.rstrip('\000') # Remove trailing NULs
# Calculate Meta-tile value for this x/y
self.mx = x & ~(METATILE-1)
self.my = y & ~(METATILE-1)
self.dest = dest
def send(self, status):
x = self.x
y = self.y
z = self.z
xmlname = self.xmlname
data = struct.pack(self.fields, 2, status, x, y, z, xmlname)
try:
self.dest.send(data)
except socket.error, e:
if e[0] == errno.EBADF:
print "Got EBADF in socket send"
else:
raise
DEG_TO_RAD = pi/180
RAD_TO_DEG = 180/pi
class SphericalProjection:
def __init__(self,levels=18):
self.Bc = []
self.Cc = []
self.zc = []
self.Ac = []
c = 256
for d in range(0,levels):
e = c/2;
self.Bc.append(c/360.0)
self.Cc.append(c/(2 * pi))
self.zc.append((e,e))
self.Ac.append(c)
c *= 2
def minmax(self, a,b,c):
a = max(a,b)
a = min(a,c)
return a
def fromLLtoPixel(self,ll,zoom):
d = self.zc[zoom]
e = round(d[0] + ll[0] * self.Bc[zoom])
f = self.minmax(sin(DEG_TO_RAD * ll[1]),-0.9999,0.9999)
g = round(d[1] + 0.5*log((1+f)/(1-f))*-self.Cc[zoom])
return (e,g)
def fromPixelToLL(self,px,zoom):
e = self.zc[zoom]
f = (px[0] - e[0])/self.Bc[zoom]
g = (px[1] - e[1])/-self.Cc[zoom]
h = RAD_TO_DEG * ( 2 * atan(exp(g)) - 0.5 * pi)
return (f,h)
class RenderThread:
def __init__(self, tile_path, styles, queue_handler):
self.tile_path = tile_path
self.queue_handler = queue_handler
self.maps = {}
METATILE = 8
RENDER_SIZE = 256 * (METATILE + 1)
for xmlname in styles:
#print "Creating Mapnik map object for %s with %s" % (xmlname, styles[xmlname])
m = mapnik.Map(RENDER_SIZE, RENDER_SIZE)
self.maps[xmlname] = m
mapnik.load_map(m, styles[xmlname])
# Projects between tile pixel co-ordinates and LatLong (EPSG:4326)
self.gprj = SphericalProjection(MAX_ZOOM)
# This is the Spherical mercator projection (EPSG:900913)
self.prj = mapnik.Projection("+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +no_defs +over")
def render_meta(self, m, style, x, y, z, sz):
# Calculate pixel positions of bottom-left & top-right
p0 = (x * 256, (y + sz) * 256)
p1 = ((x + sz) * 256, y * 256)
# Convert to LatLong (EPSG:4326)
l0 = self.gprj.fromPixelToLL(p0, z);
l1 = self.gprj.fromPixelToLL(p1, z);
# Convert to mercator co-ords (EPSG:900913)
c0 = self.prj.forward(mapnik.Coord(l0[0],l0[1]))
c1 = self.prj.forward(mapnik.Coord(l1[0],l1[1]))
# Bounding box for the meta-tile
bbox = mapnik.Envelope(c0.x,c0.y, c1.x,c1.y)
# Expand tile to provide a gutter which avoids features getting lost at edge of metatile
scale = (sz+1.0)/sz
bbox.width(bbox.width() * scale)
bbox.height(bbox.height() * scale)
# Calculate meta tile size in pixels
render_size = 256 * (sz + 1)
#m.resize(render_size, render_size);
m.zoom_to_box(bbox)
# Render image
im = mapnik.Image(render_size, render_size)
mapnik.render(m, im)
tiles = {}
# Split image up into NxN grid of tile
for yy in range(0,sz):
for xx in range(0,sz):
# Position of tile, offset due to gutter
yoff = 128 + yy * 256
xoff = 128 + xx * 256
view = im.view(xoff, yoff, 256, 256)
tile = view.tostring('png256')
#print "Got view of z(%d) x(%d) y(%d), len(%d)" % (z, x+xx, y+yy, len(tile))
tiles[(xx, yy)] = tile
return tiles
def render_request(self, r):
# Calculate the meta tile size to use for this zoom level
size = min(METATILE, 1 << r.z)
xmlname = r.xmlname
x = r.mx
y = r.my
z = r.z
try:
m = self.maps[xmlname]
except KeyError:
print "No map for: '%s'" % xmlname
return False
tiles = self.render_meta(m, xmlname, x, y, z, size)
status = self.meta_save(xmlname, x, y, z, size, tiles)
if status == True:
print "Done xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
(xmlname, r.z, x, x+size-1, y, y+size-1)
else:
print "FAILED xmlname(%s) z(%d) x(%d-%d) y(%d-%d)" % \
(xmlname, z, x, x+size-1, y, y+size-1)
return status;
def xyz_to_meta(self, xmlname, x,y, z):
mask = METATILE -1
x &= ~mask
y &= ~mask
hashes = {}
for i in range(0,5):
hashes[i] = ((x & 0x0f) << 4) | (y & 0x0f)
x >>= 4
y >>= 4
meta = "%s/%s/%d/%u/%u/%u/%u/%u.meta" % (self.tile_path, xmlname, z, hashes[4], hashes[3], hashes[2], hashes[1], hashes[0])
return meta
def xyz_to_meta_offset(self, xmlname, x,y, z):
mask = METATILE -1
offset = (x & mask) * METATILE + (y & mask)
return offset
def meta_save(self, xmlname, x, y, z, size, tiles):
#print "Saving %d tiles" % (size * size)
meta_path = self.xyz_to_meta(xmlname, x, y, z)
d = os.path.dirname(meta_path)
if not os.path.exists(d):
try:
os.makedirs(d)
except OSError:
# Multiple threads can race when creating directories,
# ignore exception if the directory now exists
if not os.path.exists(d):
raise
tmp = "%s.tmp.%d" % (meta_path, thread.get_ident())
f = open(tmp, "w")
f.write(struct.pack("4s4i", META_MAGIC, METATILE * METATILE, x, y, z))
offset = len(META_MAGIC) + 4 * 4
# Need to pre-compensate the offsets for the size of the offset/size table we are about to write
offset += (2 * 4) * (METATILE * METATILE)
# Collect all the tile sizes
sizes = {}
offsets = {}
for xx in range(0, size):
for yy in range(0, size):
mt = self.xyz_to_meta_offset(xmlname, x+xx, y+yy, z)
sizes[mt] = len(tiles[(xx, yy)])
offsets[mt] = offset
offset += sizes[mt]
# Write out the offset/size table
for mt in range(0, METATILE * METATILE):
if mt in sizes:
f.write(struct.pack("2i", offsets[mt], sizes[mt]))
else:
f.write(struct.pack("2i", 0, 0))
# Write out the tiles
for xx in range(0, size):
for yy in range(0, size):
f.write(tiles[(xx, yy)])
f.close()
os.rename(tmp, meta_path)
print "Wrote: %s" % meta_path
return True
def loop(self):
while True:
#Fetch a meta-tile to render
r = self.queue_handler.fetch()
rendered = self.render_request(r)
# Retrieve all requests for this meta-tile
requests = self.queue_handler.pop_requests(r)
for request in requests:
if (request.commandStatus == protocol.Render):
if rendered == True:
request.send(protocol.Done)
else:
request.send(protocol.NotDone)
#time.sleep(1)
print "Dummy render thread, exiting. Path %s" % self.tile_path
def start_renderers(num_threads, tile_path, styles, queue_handler):
for i in range(num_threads):
renderer = RenderThread(tile_path, styles, queue_handler)
render_thread = threading.Thread(target=renderer.loop)
render_thread.setDaemon(True)
render_thread.start()
print "Started render thread %s" % render_thread.getName()
class RequestQueues:
def __init__(self, request_limit = 32, dirty_limit = 1000):
# Queues are used as follows:
# - Incoming render requests are initally put into the request queue
# If the request queue is full then the new request is demoted dirty queue
# - Incoming 'dirty' requests are put into the dirty queue or overflow from render queue
# - The render queue holds the requests which are in progress by the render threads
self.requests = {}
self.dirties = {}
self.rendering = {}
self.request_limit = request_limit
self.dirty_limit = dirty_limit
self.cond = threading.Condition()
def add(self, request):
self.cond.acquire()
try:
# FIXME: Add short-circuit for overload condition?
if request.meta_tuple() in self.rendering:
self.rendering[request.meta_tuple()].append(request)
return "rendering"
elif request.meta_tuple() in self.requests:
self.requests[request.meta_tuple()].append(request)
return "requested"
elif request.meta_tuple() in self.dirties:
self.dirties[request.meta_tuple()].append(request)
return "dirty"
elif request.commandStatus == protocol.Render and len(self.requests) < self.request_limit:
self.requests[request.meta_tuple()] = [request]
self.cond.notify()
return "requested"
elif len(self.dirties) < self.dirty_limit:
self.dirties[request.meta_tuple()] = [request]
self.cond.notify()
return "dirty"
else:
return "dropped"
finally:
self.cond.release()
def fetch(self):
self.cond.acquire()
try:
while len(self.requests) == 0 and len(self.dirties) == 0:
self.cond.wait()
# Pull request from one of the incoming queues
try:
item = self.requests.popitem()
except KeyError:
try:
item = self.dirties.popitem()
except KeyError:
print "Odd, queues empty"
return
# Push request list on to the list of items being rendered
k = item[0]
v = item[1] # This is a list of all requests for this meta-tile
self.rendering[k] = v
# Return the first request from the list
return v[0]
finally:
self.cond.release()
def pop_requests(self, request):
self.cond.acquire()
try:
return self.rendering.pop(request.meta_tuple())
except KeyError:
# It is not yet clear why this happens, there should always be
# an entry in the rendering queue for each active meta -tile request
print "WARNING: Failed to locate request in rendering list!"
return (request,)
finally:
self.cond.release()
class ThreadedUnixStreamHandler(SocketServer.BaseRequestHandler):
def rx_request(self, request):
if (request.commandStatus != protocol.Render) \
and (request.commandStatus != protocol.Dirty):
return
if request.bad_request():
if (request.commandStatus == protocol.Render):
request.send(protocol.NotDone)
return
cur_thread = threading.currentThread()
print "%s: xml(%s) z(%d) x(%d) y(%d)" % \
(cur_thread.getName(), request.xmlname, request.z, request.x, request.y)
status = self.server.queue_handler.add(request)
if status in ("rendering", "requested"):
# Request queued, response will be sent on completion
return
# The tile won't be rendered soon
if (request.commandStatus == protocol.Render):
request.send(protocol.NotDone)
def handle(self):
cur_thread = threading.currentThread()
print "%s: New connection" % cur_thread.getName()
req_v1 = ProtocolPacketV1()
req_v2 = ProtocolPacketV2()
max_len = max(req_v1.len(), req_v2.len())
while True:
try:
data = self.request.recv(max_len)
except socket.error, e:
if e[0] == errno.ECONNRESET:
print "Connection reset by peer"
break
else:
raise
if len(data) == req_v1.len():
req_v1.receive(data, self.request)
self.rx_request(req_v1)
if len(data) == req_v2.len():
req_v2.receive(data, self.request)
self.rx_request(req_v2)
elif len(data) == 0:
print "%s: Connection closed" % cur_thread.getName()
break
else:
print "Invalid request length %d" % len(data)
break
class ThreadedUnixStreamServer(SocketServer.ThreadingMixIn, SocketServer.UnixStreamServer):
def __init__(self, address, queue_handler, handler):
if(os.path.exists(address)):
os.unlink(address)
self.address = address
self.queue_handler = queue_handler
SocketServer.UnixStreamServer.__init__(self, address, handler)
self.daemon_threads = True
def listener(address, queue_handler):
# Create the server
server = ThreadedUnixStreamServer(address, queue_handler, ThreadedUnixStreamHandler)
# The socket needs to be writeable by Apache
os.chmod(address, 0666)
# Loop forever servicing requests
server.serve_forever()
def display_config(config):
for xmlname in config.sections():
print "Layer name: %s" % xmlname
#for opt in config.options(xmlname):
# print "%s = %s" % (opt, config.get(xmlname, opt))
uri = config.get(xmlname, "uri")
xml = config.get(xmlname, "xml")
print " URI(%s) = XML(%s)" % (uri, xml)
def read_styles(config):
styles = {}
for xmlname in config.sections():
styles[xmlname] = config.get(xmlname, "xml")
return styles
if __name__ == "__main__":
try:
cfg_file = os.environ['RENDERD_CFG']
except KeyError:
cfg_file = "/etc/renderd.conf"
# FIXME: Move more of these to config file?
RENDER_SOCKET = "/tmp/osm-renderd"
HASH_PATH = "/var/lib/mod_tile"
NUM_THREADS = 4
config = ConfigParser.ConfigParser()
config.read(cfg_file)
display_config(config)
styles = read_styles(config)
queue_handler = RequestQueues()
start_renderers(NUM_THREADS, HASH_PATH, styles, queue_handler)
listener(RENDER_SOCKET, queue_handler)