[mod_tile / renderd] Add two more priority levels into the queues of renderd

In addition to the request queue and the dirty queue, there are now also a requestPrio and requestBulk queue.
The rendering order now is first render requests from requestPrio, then from request followed by the
dirty queue and finally if no other requests are queued, render from the requestBulk queue.

RequestPrio, Request and RequestBulk all block, whereas Diry immediately returns with NotDoneYet.

This also changes mod_tile to submit requests that if not rendered in time would result in 404 errors as high priority.

prioBulk should be useful for things like rerendering all the outdated tiles in the background, but this patch
does not include those changes.
This commit is contained in:
Kai Krueger
2009-09-18 21:45:58 +00:00
parent 76aee081f9
commit 54c5306295
5 changed files with 83 additions and 50 deletions

106
daemon.c
View File

@ -33,9 +33,9 @@ static pthread_t *render_threads;
static pthread_t *slave_threads; static pthread_t *slave_threads;
static struct sigaction sigPipeAction; static struct sigaction sigPipeAction;
static struct item reqHead, dirtyHead, renderHead; static struct item reqHead, reqPrioHead, reqBulkHead, dirtyHead, renderHead;
static struct item_idx * item_hashidx; static struct item_idx * item_hashidx;
static int reqNum, dirtyNum; static int reqNum, reqPrioNum, reqBulkNum, dirtyNum;
static pthread_mutex_t qLock; static pthread_mutex_t qLock;
static pthread_cond_t qCond; static pthread_cond_t qCond;
@ -53,10 +53,14 @@ struct item *fetch_request(void)
pthread_mutex_lock(&qLock); pthread_mutex_lock(&qLock);
while (reqNum == 0 && dirtyNum == 0) { while ((reqNum == 0) && (dirtyNum == 0) && (reqPrioNum == 0) && (reqBulkNum == 0)) {
pthread_cond_wait(&qCond, &qLock); pthread_cond_wait(&qCond, &qLock);
} }
if (reqNum) { if (reqPrioNum) {
item = reqPrioHead.next;
reqPrioNum--;
stats.noReqPrioRender++;
} else if (reqNum) {
item = reqHead.next; item = reqHead.next;
reqNum--; reqNum--;
stats.noReqRender++; stats.noReqRender++;
@ -64,6 +68,10 @@ struct item *fetch_request(void)
item = dirtyHead.next; item = dirtyHead.next;
dirtyNum--; dirtyNum--;
stats.noDirtyRender++; stats.noDirtyRender++;
} else if (reqBulkNum) {
item = reqBulkHead.next;
reqNum--;
stats.noReqBulkRender++;
} }
if (item) { if (item) {
item->next->prev = item->prev; item->next->prev = item->prev;
@ -83,38 +91,33 @@ struct item *fetch_request(void)
void clear_requests(int fd) void clear_requests(int fd)
{ {
struct item *item, *dupes; struct item *item, *dupes, *queueHead;
/**Only need to look up on the shorter request and render queue /**Only need to look up on the shorter request and render queue
* so using the linear list shouldn't be a problem * so using the linear list shouldn't be a problem
*/ */
pthread_mutex_lock(&qLock); pthread_mutex_lock(&qLock);
item = reqHead.next; for (int i = 0; i < 4; i++) {
while (item != &reqHead) { switch (i) {
if (item->fd == fd) case 0: { queueHead = &reqHead; break;}
item->fd = FD_INVALID; case 1: { queueHead = &renderHead; break;}
case 2: { queueHead = &reqPrioHead; break;}
dupes = item->duplicates; case 3: { queueHead = &reqBulkHead; break;}
while (dupes) {
if (dupes->fd == fd)
dupes->fd = FD_INVALID;
dupes = dupes->duplicates;
} }
item = item->next;
}
item = renderHead.next; item = queueHead->next;
while (item != &renderHead) { while (item != queueHead) {
if (item->fd == fd) if (item->fd == fd)
item->fd = FD_INVALID; item->fd = FD_INVALID;
dupes = item->duplicates; dupes = item->duplicates;
while (dupes) { while (dupes) {
if (dupes->fd == fd) if (dupes->fd == fd)
dupes->fd = FD_INVALID; dupes->fd = FD_INVALID;
dupes = dupes->duplicates; dupes = dupes->duplicates;
}
item = item->next;
} }
item = item->next;
} }
pthread_mutex_unlock(&qLock); pthread_mutex_unlock(&qLock);
@ -227,6 +230,8 @@ static inline const char *cmdStr(enum protoCmd c)
switch (c) { switch (c) {
case cmdIgnore: return "Ignore"; case cmdIgnore: return "Ignore";
case cmdRender: return "Render"; case cmdRender: return "Render";
case cmdRenderPrio: return "RenderPrio";
case cmdRenderBulk: return "RenderBulk";
case cmdDirty: return "Dirty"; case cmdDirty: return "Dirty";
case cmdDone: return "Done"; case cmdDone: return "Done";
case cmdNotDone: return "NotDone"; case cmdNotDone: return "NotDone";
@ -248,7 +253,7 @@ void send_response(struct item *item, enum protoCmd rsp)
while (item) { while (item) {
struct item *prev = item; struct item *prev = item;
req = &item->req; req = &item->req;
if ((item->fd != FD_INVALID) && (req->cmd == cmdRender)) { if ((item->fd != FD_INVALID) && ((req->cmd == cmdRender) || (req->cmd == cmdRenderPrio) || (req->cmd == cmdRenderBulk))) {
req->cmd = rsp; req->cmd = rsp;
//fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd); //fprintf(stderr, "Sending message %s to %d\n", cmdStr(rsp), item->fd);
ret = send(item->fd, req, sizeof(*req), 0); ret = send(item->fd, req, sizeof(*req), 0);
@ -270,12 +275,12 @@ enum protoCmd pending(struct item *test)
item = lookup_item_idx(test); item = lookup_item_idx(test);
if (item != NULL) { if (item != NULL) {
if ((item->inQueue == queueRender) || (item->inQueue == queueRequest)) { if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio)) {
test->duplicates = item->duplicates; test->duplicates = item->duplicates;
item->duplicates = test; item->duplicates = test;
test->inQueue = queueDuplicate; test->inQueue = queueDuplicate;
return cmdIgnore; return cmdIgnore;
} else if (item->inQueue == queueDirty) { } else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)){
return cmdNotDone; return cmdNotDone;
} }
} }
@ -305,7 +310,7 @@ enum protoCmd rx_request(const struct protocol *req, int fd)
syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)", syslog(LOG_DEBUG, "DEBUG: Got command %s fd(%d) xml(%s), z(%d), x(%d), y(%d)",
cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y); cmdStr(req->cmd), fd, req->xmlname, req->z, req->x, req->y);
if ((req->cmd != cmdRender) && (req->cmd != cmdDirty)) if ((req->cmd != cmdRender) && (req->cmd != cmdRenderPrio) && (req->cmd != cmdDirty) && (req->cmd != cmdRenderBulk))
return cmdIgnore; return cmdIgnore;
if (check_xyz(req->x, req->y, req->z)) if (check_xyz(req->x, req->y, req->z))
@ -319,7 +324,7 @@ enum protoCmd rx_request(const struct protocol *req, int fd)
item->req = *req; item->req = *req;
item->duplicates = NULL; item->duplicates = NULL;
item->fd = (req->cmd == cmdRender) ? fd : FD_INVALID; item->fd = (req->cmd == cmdDirty) ? FD_INVALID : fd;
#ifdef METATILE #ifdef METATILE
/* Round down request co-ordinates to the neareast N (should be a power of 2) /* Round down request co-ordinates to the neareast N (should be a power of 2)
@ -335,14 +340,6 @@ enum protoCmd rx_request(const struct protocol *req, int fd)
pthread_mutex_lock(&qLock); pthread_mutex_lock(&qLock);
if (dirtyNum == DIRTY_LIMIT) {
// The queue is severely backlogged. Drop request
stats.noReqDroped++;
pthread_mutex_unlock(&qLock);
free(item);
return cmdNotDone;
}
// Check for a matching request in the current rendering or dirty queues // Check for a matching request in the current rendering or dirty queues
pend = pending(item); pend = pending(item);
if (pend == cmdNotDone) { if (pend == cmdNotDone) {
@ -362,11 +359,25 @@ enum protoCmd rx_request(const struct protocol *req, int fd)
list = &reqHead; list = &reqHead;
item->inQueue = queueRequest; item->inQueue = queueRequest;
reqNum++; reqNum++;
} else if ((req->cmd == cmdRenderPrio) && (reqPrioNum < REQ_LIMIT)) {
list = &reqPrioHead;
item->inQueue = queueRequestPrio;
reqPrioNum++;
} else if ((req->cmd == cmdRenderBulk) && (reqBulkNum < REQ_LIMIT)) {
list = &reqBulkHead;
item->inQueue = queueRequestBulk;
reqBulkNum++;
} else if (dirtyNum < DIRTY_LIMIT) { } else if (dirtyNum < DIRTY_LIMIT) {
list = &dirtyHead; list = &dirtyHead;
item->inQueue = queueDirty; item->inQueue = queueDirty;
dirtyNum++; dirtyNum++;
item->fd = FD_INVALID; // No response after render item->fd = FD_INVALID; // No response after render
} else {
// The queue is severely backlogged. Drop request
stats.noReqDroped++;
pthread_mutex_unlock(&qLock);
free(item);
return cmdNotDone;
} }
if (list) { if (list) {
@ -478,6 +489,9 @@ void *stats_writeout_thread(void * arg) {
stats_struct lStats; stats_struct lStats;
int dirtQueueLength; int dirtQueueLength;
int reqQueueLength; int reqQueueLength;
int reqPrioQueueLength;
int reqBulkQueueLength;
int noFailedAttempts = 0; int noFailedAttempts = 0;
char tmpName[PATH_MAX]; char tmpName[PATH_MAX];
@ -489,6 +503,8 @@ void *stats_writeout_thread(void * arg) {
memcpy(&lStats, &stats, sizeof(stats_struct)); memcpy(&lStats, &stats, sizeof(stats_struct));
dirtQueueLength = dirtyNum; dirtQueueLength = dirtyNum;
reqQueueLength = reqNum; reqQueueLength = reqNum;
reqPrioQueueLength = reqPrioNum;
reqBulkQueueLength = reqBulkNum;
pthread_mutex_unlock(&qLock); pthread_mutex_unlock(&qLock);
FILE * statfile = fopen(tmpName, "w"); FILE * statfile = fopen(tmpName, "w");
@ -503,9 +519,13 @@ void *stats_writeout_thread(void * arg) {
} else { } else {
noFailedAttempts = 0; noFailedAttempts = 0;
fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength); fprintf(statfile, "ReqQueueLength: %i\n", reqQueueLength);
fprintf(statfile, "ReqPrioQueueLength: %i\n", reqPrioQueueLength);
fprintf(statfile, "ReqBulkQueueLength: %i\n", reqBulkQueueLength);
fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength); fprintf(statfile, "DirtQueueLength: %i\n", dirtQueueLength);
fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped); fprintf(statfile, "DropedRequest: %li\n", lStats.noReqDroped);
fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender); fprintf(statfile, "ReqRendered: %li\n", lStats.noReqRender);
fprintf(statfile, "ReqPrioRendered: %li\n", lStats.noReqPrioRender);
fprintf(statfile, "ReqBulkRendered: %li\n", lStats.noReqBulkRender);
fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender); fprintf(statfile, "DirtyRendered: %li\n", lStats.noDirtyRender);
fclose(statfile); fclose(statfile);
if (rename(tmpName, config.stats_filename)) { if (rename(tmpName, config.stats_filename)) {
@ -846,6 +866,8 @@ int main(int argc, char **argv)
pthread_mutex_init(&qLock, NULL); pthread_mutex_init(&qLock, NULL);
pthread_cond_init(&qCond, NULL); pthread_cond_init(&qCond, NULL);
reqHead.next = reqHead.prev = &reqHead; reqHead.next = reqHead.prev = &reqHead;
reqPrioHead.next = reqPrioHead.prev = &reqPrioHead;
reqBulkHead.next = reqBulkHead.prev = &reqBulkHead;
dirtyHead.next = dirtyHead.prev = &dirtyHead; dirtyHead.next = dirtyHead.prev = &dirtyHead;
renderHead.next = renderHead.prev = &renderHead; renderHead.next = renderHead.prev = &renderHead;
hashidxSize = HASHIDX_SIZE; hashidxSize = HASHIDX_SIZE;
@ -855,6 +877,8 @@ int main(int argc, char **argv)
stats.noDirtyRender = 0; stats.noDirtyRender = 0;
stats.noReqDroped = 0; stats.noReqDroped = 0;
stats.noReqRender = 0; stats.noReqRender = 0;
stats.noReqPrioRender = 0;
stats.noReqBulkRender = 0;
xmlconfigitem maps[XMLCONFIGS_MAX]; xmlconfigitem maps[XMLCONFIGS_MAX];
bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX); bzero(maps, sizeof(xmlconfigitem) * XMLCONFIGS_MAX);
@ -963,7 +987,7 @@ int main(int argc, char **argv)
"mapnik:plugins_dir", (char *) MAPNIK_PLUGINS); "mapnik:plugins_dir", (char *) MAPNIK_PLUGINS);
config.mapnik_font_dir = iniparser_getstring(ini, config.mapnik_font_dir = iniparser_getstring(ini,
"mapnik:font_dir", (char *) FONT_DIR); "mapnik:font_dir", (char *) FONT_DIR);
config.mapnik_font_dir_recurse = iniparser_getboolean(ini, config.mapnik_font_dir_recurse = iniparser_getboolean(ini,
"mapnik:font_dir_recurse", FONT_RECURSE); "mapnik:font_dir_recurse", FONT_RECURSE);
} else { } else {
noSlaveRenders += config_slaves[render_sec].num_threads; noSlaveRenders += config_slaves[render_sec].num_threads;

View File

@ -32,6 +32,8 @@ typedef struct {
typedef struct { typedef struct {
long noDirtyRender; long noDirtyRender;
long noReqRender; long noReqRender;
long noReqPrioRender;
long noReqBulkRender;
long noReqDroped; long noReqDroped;
} stats_struct; } stats_struct;

View File

@ -10,7 +10,7 @@ extern "C" {
#define HTCP_EXPIRE_CACHE 1 #define HTCP_EXPIRE_CACHE 1
#define HTCP_EXPIRE_CACHE_PORT "4827" #define HTCP_EXPIRE_CACHE_PORT "4827"
enum queueEnum {queueRequest, queueDirty, queueRender, queueDuplicate}; enum queueEnum {queueRequest, queueRequestPrio, queueRequestBulk, queueDirty, queueRender, queueDuplicate};
struct item { struct item {
struct item *next; struct item *next;

View File

@ -158,7 +158,7 @@ int socket_init(request_rec *r)
return fd; return fd;
} }
int request_tile(request_rec *r, struct protocol *cmd, int dirtyOnly) int request_tile(request_rec *r, struct protocol *cmd, int renderImmediately)
{ {
int fd; int fd;
int ret = 0; int ret = 0;
@ -177,7 +177,11 @@ int request_tile(request_rec *r, struct protocol *cmd, int dirtyOnly)
// cmd has already been partial filled, fill in the rest // cmd has already been partial filled, fill in the rest
cmd->ver = PROTO_VER; cmd->ver = PROTO_VER;
cmd->cmd = dirtyOnly ? cmdDirty : cmdRender; switch (renderImmediately) {
case 0: { cmd->cmd = cmdDirty; break;}
case 1: { cmd->cmd = cmdRender; break;}
case 2: { cmd->cmd = cmdRenderPrio; break;}
}
ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, "Requesting xml(%s) z(%d) x(%d) y(%d)", cmd->xmlname, cmd->z, cmd->x, cmd->y); ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, "Requesting xml(%s) z(%d) x(%d) y(%d)", cmd->xmlname, cmd->z, cmd->x, cmd->y);
do { do {
@ -195,7 +199,7 @@ int request_tile(request_rec *r, struct protocol *cmd, int dirtyOnly)
return 0; return 0;
} while (retry--); } while (retry--);
if (!dirtyOnly) { if (renderImmediately) {
struct timeval tv = {scfg->request_timeout, 0 }; struct timeval tv = {scfg->request_timeout, 0 };
fd_set rx; fd_set rx;
int s; int s;
@ -509,7 +513,7 @@ static int tile_handler_dirty(request_rec *r)
if (cmd == NULL) if (cmd == NULL)
return DECLINED; return DECLINED;
request_tile(r, cmd, 1); request_tile(r, cmd, 0);
return error_message(r, "Tile submitted for rendering\n"); return error_message(r, "Tile submitted for rendering\n");
} }
@ -517,6 +521,7 @@ static int tile_storage_hook(request_rec *r)
{ {
// char abs_path[PATH_MAX]; // char abs_path[PATH_MAX];
int avg; int avg;
int renderPrio = 0;
enum tileState state; enum tileState state;
ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, "tile_storage_hook: handler(%s), uri(%s), filename(%s), path_info(%s)", ap_log_rerror(APLOG_MARK, APLOG_INFO, 0, r, "tile_storage_hook: handler(%s), uri(%s), filename(%s), path_info(%s)",
@ -565,7 +570,7 @@ should already be done
case tileOld: case tileOld:
if (avg > scfg->max_load_old) { if (avg > scfg->max_load_old) {
// Too much load to render it now, mark dirty but return old tile // Too much load to render it now, mark dirty but return old tile
request_tile(r, cmd, 1); request_tile(r, cmd, 0);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Load larger max_load_old (%d). Mark dirty and deliver from cache.", scfg->max_load_old); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Load larger max_load_old (%d). Mark dirty and deliver from cache.", scfg->max_load_old);
if (!incFreshCounter(OLD, r)) { if (!incFreshCounter(OLD, r)) {
ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
@ -573,10 +578,11 @@ should already be done
} }
return OK; return OK;
} }
renderPrio = 1;
break; break;
case tileMissing: case tileMissing:
if (avg > scfg->max_load_missing) { if (avg > scfg->max_load_missing) {
request_tile(r, cmd, 1); request_tile(r, cmd, 0);
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Load larger max_load_missing (%d). Return HTTP_NOT_FOUND.", scfg->max_load_missing); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Load larger max_load_missing (%d). Return HTTP_NOT_FOUND.", scfg->max_load_missing);
if (!incRespCounter(HTTP_NOT_FOUND, r)) { if (!incRespCounter(HTTP_NOT_FOUND, r)) {
ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r, ap_log_rerror(APLOG_MARK, APLOG_WARNING, 0, r,
@ -584,10 +590,11 @@ should already be done
} }
return HTTP_NOT_FOUND; return HTTP_NOT_FOUND;
} }
renderPrio = 2;
break; break;
} }
if (request_tile(r, cmd, 0)) { if (request_tile(r, cmd, renderPrio)) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Update file info abs_path(%s)", r->filename); ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "Update file info abs_path(%s)", r->filename);
// Need to update fileinfo for new rendered tile // Need to update fileinfo for new rendered tile
apr_stat(&r->finfo, r->filename, APR_FINFO_MIN, r->pool); apr_stat(&r->finfo, r->filename, APR_FINFO_MIN, r->pool);

View File

@ -20,7 +20,7 @@ extern "C" {
#define RENDER_SOCKET "/tmp/osm-renderd" #define RENDER_SOCKET "/tmp/osm-renderd"
#define XMLCONFIG_MAX 41 #define XMLCONFIG_MAX 41
enum protoCmd { cmdIgnore, cmdRender, cmdDirty, cmdDone, cmdNotDone }; enum protoCmd { cmdIgnore, cmdRender, cmdDirty, cmdDone, cmdNotDone, cmdRenderPrio, cmdRenderBulk };
struct protocol { struct protocol {
int ver; int ver;