Files
mod_tile/src/request_queue.c
David Hummel a14924a999 Logging Improvements
* Using [`GLib Logging Framework`](https://developer.gnome.org/programming-guidelines/stable/logging.html.en) for logging
  * Created new `g_logger` function for logging
    * Outputs to `stdout`/`stderr` only when running in `foreground`
      * `stderr` for `message`, `warning`, `critical` & `error` levels
      * `stdout` for `debug` & `info` levels
        * Use `G_MESSAGES_DEBUG=all` environment to enable `debug` to print
    * Otherwise, output will be to `syslog` or `systemd journal` (when appropriate)
* Standardized usage of `{LOG_PRIORITY}: ` prefix in log messages
  * Only when using `syslog`, otherwise `GLib Logging` will take care of it
* Changed `fprintf(stderr`, `printf` & `perror` calls to `g_logger` calls
  * You might want to check them out closely to make sure I chose the right levels
  * No changes to `logging/output` were made to "`foreground`" programs (I.E. `render_*`)
* Changed `0`,`1` to `no_argument`,`required_argument` in `getopt_long`'s `long_options`
  * Fixed `renderd`'s `foreground` opt (should be `no_argument` [0] rather than `reguired_argument` [1])
* Basic test for `mod_tile` module
* ~~Extended `renderd` log priority onto Mapnik's logger~~
2021-07-28 14:46:50 +00:00

500 lines
12 KiB
C

/*
* Copyright (c) 2007 - 2020 by mod_tile contributors (see AUTHORS file)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see http://www.gnu.org/licenses/.
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include "render_config.h"
#include "request_queue.h"
#include "g_logger.h"
static int calcHashKey(struct request_queue *queue, struct item *item)
{
uint64_t xmlnameHash = 0;
uint64_t key;
for (int i = 0; (item->req.xmlname[i] != 0) && (i < sizeof(item->req.xmlname)); i++) {
xmlnameHash += item->req.xmlname[i];
}
key = ((uint64_t)(xmlnameHash & 0x1FF) << 52) + ((uint64_t)(item->req.z) << 48) + ((uint64_t)(item->mx & 0xFFFFFF) << 24) + (item->my & 0xFFFFFF);
return key % queue->hashidxSize;
}
static struct item * lookup_item_idx(struct request_queue * queue, struct item * item)
{
struct item_idx * nextItem;
struct item * test;
int key = calcHashKey(queue, item);
if (queue->item_hashidx[key].item == NULL) {
return NULL;
} else {
nextItem = &(queue->item_hashidx[key]);
while (nextItem != NULL) {
test = nextItem->item;
if ((item->mx == test->mx) && (item->my == test->my)
&& (item->req.z == test->req.z) && (!strcmp(
item->req.xmlname, test->req.xmlname))) {
return test;
} else {
nextItem = nextItem->next;
}
}
}
return NULL;
}
static void insert_item_idx(struct request_queue * queue, struct item *item)
{
struct item_idx * nextItem;
struct item_idx * prevItem;
int key = calcHashKey(queue, item);
if (queue->item_hashidx[key].item == NULL) {
queue->item_hashidx[key].item = item;
} else {
prevItem = &(queue->item_hashidx[key]);
nextItem = queue->item_hashidx[key].next;
while (nextItem) {
prevItem = nextItem;
nextItem = nextItem->next;
}
nextItem = (struct item_idx *)malloc(sizeof(struct item_idx));
nextItem->item = item;
nextItem->next = NULL;
prevItem->next = nextItem;
}
}
static void remove_item_idx(struct request_queue * queue, struct item * item)
{
int key = calcHashKey(queue, item);
struct item_idx * nextItem;
struct item_idx * prevItem;
struct item * test;
if (queue->item_hashidx[key].item == NULL) {
//item not in index;
return;
}
prevItem = &(queue->item_hashidx[key]);
nextItem = &(queue->item_hashidx[key]);
while (nextItem != NULL) {
test = nextItem->item;
if ((item->mx == test->mx) && (item->my == test->my) && (item->req.z
== test->req.z) && (!strcmp(item->req.xmlname,
test->req.xmlname))) {
/*
* Found item, removing it from list
*/
nextItem->item = NULL;
if (nextItem->next != NULL) {
if (nextItem == &(queue->item_hashidx[key])) {
prevItem = nextItem->next;
memcpy(&(queue->item_hashidx[key]), nextItem->next,
sizeof(struct item_idx));
free(prevItem);
} else {
prevItem->next = nextItem->next;
}
} else {
prevItem->next = NULL;
}
if (nextItem != &(queue->item_hashidx[key])) {
free(nextItem);
}
return;
} else {
prevItem = nextItem;
nextItem = nextItem->next;
}
}
}
static enum protoCmd pending(struct request_queue * queue, struct item *test)
{
// check all queues and render list to see if this request already queued
// If so, add this new request as a duplicate
// call with qLock held
struct item *item;
item = lookup_item_idx(queue, test);
if (item != NULL) {
if ((item->inQueue == queueRender) || (item->inQueue == queueRequest) || (item->inQueue == queueRequestPrio) || (item->inQueue == queueRequestLow)) {
test->duplicates = item->duplicates;
item->duplicates = test;
test->inQueue = queueDuplicate;
return cmdIgnore;
} else if ((item->inQueue == queueDirty) || (item->inQueue == queueRequestBulk)) {
return cmdNotDone;
}
}
return cmdRender;
}
struct item *request_queue_fetch_request(struct request_queue * queue)
{
struct item *item = NULL;
pthread_mutex_lock(&(queue->qLock));
while ((queue->reqNum == 0) && (queue->dirtyNum == 0) && (queue->reqLowNum == 0) && (queue->reqPrioNum == 0) && (queue->reqBulkNum == 0)) {
pthread_cond_wait(&(queue->qCond), &(queue->qLock));
}
if (queue->reqPrioNum) {
item = queue->reqPrioHead.next;
queue->reqPrioNum--;
queue->stats.noReqPrioRender++;
} else if (queue->reqNum) {
item = queue->reqHead.next;
queue->reqNum--;
queue->stats.noReqRender++;
} else if (queue->reqLowNum) {
item = queue->reqLowHead.next;
queue->reqLowNum--;
queue->stats.noReqLowRender++;
} else if (queue->dirtyNum) {
item = queue->dirtyHead.next;
queue->dirtyNum--;
queue->stats.noDirtyRender++;
} else if (queue->reqBulkNum) {
item = queue->reqBulkHead.next;
queue->reqBulkNum--;
queue->stats.noReqBulkRender++;
}
if (item) {
item->next->prev = item->prev;
item->prev->next = item->next;
//Add item to render queue
item->prev = &(queue->renderHead);
item->next = queue->renderHead.next;
queue->renderHead.next->prev = item;
queue->renderHead.next = item;
item->inQueue = queueRender;
}
pthread_mutex_unlock(&queue->qLock);
return item;
}
/* If a fd becomes invalid for returning request information, remove it from all
* requests to not send feedback to invalid FDs
*/
void request_queue_clear_requests_by_fd(struct request_queue * queue, int fd)
{
struct item *item, *dupes, *queueHead;
/**Only need to look up on the shorter request and render queue,
* as the all requests on the dirty queue already have a FD_INVALID
* as a file descriptor, so using the linear list shouldn't be a problem
*/
pthread_mutex_lock(&(queue->qLock));
for (int i = 0; i < 4; i++) {
switch (i) {
case 0: {
queueHead = &(queue->reqHead);
break;
}
case 1: {
queueHead = &(queue->renderHead);
break;
}
case 2: {
queueHead = &(queue->reqPrioHead);
break;
}
case 3: {
queueHead = &(queue->reqBulkHead);
break;
}
}
item = queueHead->next;
while (item != queueHead) {
if (item->fd == fd) {
item->fd = FD_INVALID;
}
dupes = item->duplicates;
while (dupes) {
if (dupes->fd == fd) {
dupes->fd = FD_INVALID;
}
dupes = dupes->duplicates;
}
item = item->next;
}
}
pthread_mutex_unlock(&(queue->qLock));
}
enum protoCmd request_queue_add_request(struct request_queue * queue, struct item *item)
{
enum protoCmd status;
const struct protocol *req;
struct item *list = NULL;
req = &(item->req);
if (queue == NULL) {
g_logger(G_LOG_LEVEL_CRITICAL, "queue os NULL");
exit(3);
}
pthread_mutex_lock(&(queue->qLock));
// Check for a matching request in the current rendering or dirty queues
status = pending(queue, item);
if (status == cmdNotDone) {
// We found a match in the dirty queue, can not wait for it
pthread_mutex_unlock(&(queue->qLock));
free(item);
return cmdNotDone;
}
if (status == cmdIgnore) {
// Found a match in render queue, item added as duplicate
pthread_mutex_unlock(&(queue->qLock));
return cmdIgnore;
}
// New request, add it to render or dirty queue
if ((req->cmd == cmdRender) && (queue->reqNum < REQ_LIMIT)) {
list = &(queue->reqHead);
item->inQueue = queueRequest;
item->originatedQueue = queueRequest;
queue->reqNum++;
} else if ((req->cmd == cmdRenderPrio) && (queue->reqPrioNum < REQ_LIMIT)) {
list = &(queue->reqPrioHead);
item->inQueue = queueRequestPrio;
item->originatedQueue = queueRequestPrio;
queue->reqPrioNum++;
} else if ((req->cmd == cmdRenderLow) && (queue->reqLowNum < REQ_LIMIT)) {
list = &(queue->reqLowHead);
item->inQueue = queueRequestLow;
item->originatedQueue = queueRequestLow;
queue->reqLowNum++;
} else if ((req->cmd == cmdRenderBulk) && (queue->reqBulkNum < REQ_LIMIT)) {
list = &(queue->reqBulkHead);
item->inQueue = queueRequestBulk;
item->originatedQueue = queueRequestBulk;
queue->reqBulkNum++;
} else if (queue->dirtyNum < DIRTY_LIMIT) {
list = &(queue->dirtyHead);
item->inQueue = queueDirty;
item->originatedQueue = queueDirty;
queue->dirtyNum++;
item->fd = FD_INVALID; // No response after render
} else {
// The queue is severely backlogged. Drop request
queue->stats.noReqDroped++;
pthread_mutex_unlock(&(queue->qLock));
free(item);
return cmdNotDone;
}
if (list) {
item->next = list;
item->prev = list->prev;
item->prev->next = item;
list->prev = item;
/* In addition to the linked list, add item to a hash table index
* for faster lookup of pending requests.
*/
insert_item_idx(queue, item);
pthread_cond_signal(&queue->qCond);
} else {
free(item);
}
pthread_mutex_unlock(&queue->qLock);
return (list == &(queue->dirtyHead)) ? cmdNotDone : cmdIgnore;
}
void request_queue_remove_request(struct request_queue * queue, struct item * request, int render_time)
{
pthread_mutex_lock(&(queue->qLock));
if (request->inQueue != queueRender) {
g_logger(G_LOG_LEVEL_WARNING, "Removing request from queue, even though not on rendering queue");
}
if (render_time > 0) {
switch (request->originatedQueue) {
case queueRequestPrio: {
queue->stats.timeReqPrioRender += render_time;
break;
}
case queueRequest: {
queue->stats.timeReqRender += render_time;
break;
}
case queueRequestLow: {
queue->stats.timeReqLowRender += render_time;
break;
}
case queueDirty: {
queue->stats.timeReqDirty += render_time;
break;
}
case queueRequestBulk: {
queue->stats.timeReqBulkRender += render_time;
break;
}
}
queue->stats.noZoomRender[request->req.z]++;
queue->stats.timeZoomRender[request->req.z] += render_time;
}
request->next->prev = request->prev;
request->prev->next = request->next;
remove_item_idx(queue, request);
pthread_mutex_unlock(&(queue->qLock));
}
int request_queue_no_requests_queued(struct request_queue * queue, enum protoCmd priority)
{
int noReq = -1;
pthread_mutex_lock(&(queue->qLock));
switch (priority) {
case cmdRenderPrio:
noReq = queue->reqPrioNum;
break;
case cmdRender:
noReq = queue->reqNum;
break;
case cmdRenderLow:
noReq = queue->reqLowNum;
break;
case cmdDirty:
noReq = queue->dirtyNum;
break;
case cmdRenderBulk:
noReq = queue->reqBulkNum;
break;
}
pthread_mutex_unlock(&queue->qLock);
return noReq;
}
void request_queue_copy_stats(struct request_queue * queue, stats_struct * stats)
{
pthread_mutex_lock(&(queue->qLock));
memcpy(stats, &(queue->stats), sizeof(stats_struct));
pthread_mutex_unlock(&queue->qLock);
}
struct request_queue * request_queue_init()
{
int res;
struct request_queue * queue = calloc(1, sizeof(struct request_queue));
if (queue == NULL) {
return NULL;
}
res = pthread_mutex_init(&(queue->qLock), NULL);
if (res != 0) {
g_logger(G_LOG_LEVEL_ERROR, "Failed to create mutex for request_queue");
free(queue);
return NULL;
}
res = pthread_cond_init(&(queue->qCond), NULL);
if (res != 0) {
g_logger(G_LOG_LEVEL_ERROR, "Failed to create condition variable for request_queue");
pthread_mutex_destroy(&(queue->qLock));
free(queue);
return NULL;
}
queue->stats.noDirtyRender = 0;
queue->stats.noReqDroped = 0;
queue->stats.noReqRender = 0;
queue->stats.noReqPrioRender = 0;
queue->stats.noReqLowRender = 0;
queue->stats.noReqBulkRender = 0;
queue->reqHead.next = queue->reqHead.prev = &(queue->reqHead);
queue->reqPrioHead.next = queue->reqPrioHead.prev = &(queue->reqPrioHead);
queue->reqLowHead.next = queue->reqLowHead.prev = &(queue->reqLowHead);
queue->reqBulkHead.next = queue->reqBulkHead.prev = &(queue->reqBulkHead);
queue->dirtyHead.next = queue->dirtyHead.prev = &(queue->dirtyHead);
queue->renderHead.next = queue->renderHead.prev = &(queue->renderHead);
queue->hashidxSize = HASHIDX_SIZE;
queue->item_hashidx = (struct item_idx *) malloc(sizeof(struct item_idx) * queue->hashidxSize);
bzero(queue->item_hashidx, sizeof(struct item_idx) * queue->hashidxSize);
return queue;
}
void request_queue_close(struct request_queue * queue)
{
//TODO: Free items if the queues are not empty at closing time
pthread_mutex_destroy(&(queue->qLock));
free(queue->item_hashidx);
free(queue);
}