mirror of
https://github.com/apache/httpd.git
synced 2025-08-03 16:33:59 +00:00

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@1876599 13f79535-47bb-0310-9956-ffa450edef68
1130 lines
32 KiB
C
1130 lines
32 KiB
C
/* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
* contributor license agreements. See the NOTICE file distributed with
|
|
* this work for additional information regarding copyright ownership.
|
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
* (the "License"); you may not use this file except in compliance with
|
|
* the License. You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#include "mod_serf.h"
|
|
|
|
#include "httpd.h"
|
|
#include "http_core.h"
|
|
#include "http_config.h"
|
|
#include "http_protocol.h"
|
|
#include "http_request.h"
|
|
#include "http_log.h"
|
|
|
|
#include "serf.h"
|
|
#include "apr_uri.h"
|
|
#include "apr_strings.h"
|
|
#include "apr_version.h"
|
|
#include "ap_mpm.h"
|
|
|
|
module AP_MODULE_DECLARE_DATA serf_module;
|
|
static int mpm_supprts_serf = 0;
|
|
|
|
typedef struct {
|
|
int on;
|
|
int preservehost;
|
|
apr_uri_t url;
|
|
} serf_config_t;
|
|
|
|
typedef struct {
|
|
const char *name;
|
|
const char *provider;
|
|
apr_table_t *params;
|
|
} serf_cluster_t;
|
|
|
|
typedef struct {
|
|
/* name -> serf_cluster_t* */
|
|
apr_hash_t *clusters;
|
|
} serf_server_config_t;
|
|
|
|
typedef struct {
|
|
int rstatus;
|
|
int want_ssl;
|
|
int done_headers;
|
|
int keep_reading;
|
|
request_rec *r;
|
|
apr_pool_t *serf_pool;
|
|
apr_bucket_brigade *tmpbb;
|
|
serf_config_t *conf;
|
|
serf_ssl_context_t *ssl_ctx;
|
|
serf_bucket_alloc_t *bkt_alloc;
|
|
serf_bucket_t *body_bkt;
|
|
} s_baton_t;
|
|
|
|
#if !APR_VERSION_AT_LEAST(1,4,0)
|
|
#define apr_time_from_msec(x) (x * 1000)
|
|
#endif
|
|
|
|
/**
|
|
* This works right now because all timers are invoked in the single listener
|
|
* thread in the Event MPM -- the same thread that serf callbacks are made
|
|
* from, so we don't technically need a mutex yet, but with the Simple MPM,
|
|
* invocations are made from worker threads, and we need to figure out locking
|
|
*/
|
|
static void timed_cleanup_callback(void *baton)
|
|
{
|
|
s_baton_t *ctx = baton;
|
|
|
|
/* Causes all serf connections to unregister from the event mpm */
|
|
if (ctx->rstatus) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r, APLOGNO(01119)
|
|
"serf: request returned: %d", ctx->rstatus);
|
|
ctx->r->status = HTTP_OK;
|
|
apr_pool_destroy(ctx->serf_pool);
|
|
ap_die(ctx->rstatus, ctx->r);
|
|
}
|
|
else {
|
|
apr_bucket *e;
|
|
apr_brigade_cleanup(ctx->tmpbb);
|
|
e = apr_bucket_flush_create(ctx->r->connection->bucket_alloc);
|
|
APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
|
|
e = apr_bucket_eos_create(ctx->r->connection->bucket_alloc);
|
|
APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
|
|
|
|
/* TODO: return code? bleh */
|
|
ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
|
|
|
|
apr_pool_destroy(ctx->serf_pool);
|
|
|
|
ap_finalize_request_protocol(ctx->r);
|
|
ap_process_request_after_handler(ctx->r);
|
|
return;
|
|
}
|
|
}
|
|
|
|
static void closed_connection(serf_connection_t *conn,
|
|
void *closed_baton,
|
|
apr_status_t why,
|
|
apr_pool_t *pool)
|
|
{
|
|
s_baton_t *ctx = closed_baton;
|
|
|
|
if (why) {
|
|
/* justin says that error handling isn't done yet. hah. */
|
|
/* XXXXXX: review */
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, why, ctx->r, APLOGNO(01120) "Closed Connection Error");
|
|
ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
if (mpm_supprts_serf) {
|
|
ap_mpm_register_timed_callback(apr_time_from_msec(1),
|
|
timed_cleanup_callback, ctx);
|
|
}
|
|
ctx->keep_reading = 0;
|
|
}
|
|
|
|
static serf_bucket_t* conn_setup(apr_socket_t *sock,
|
|
void *setup_baton,
|
|
apr_pool_t *pool)
|
|
{
|
|
serf_bucket_t *c;
|
|
s_baton_t *ctx = setup_baton;
|
|
|
|
c = serf_bucket_socket_create(sock, ctx->bkt_alloc);
|
|
if (ctx->want_ssl) {
|
|
c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
|
|
}
|
|
|
|
return c;
|
|
}
|
|
|
|
static int copy_headers_in(void *vbaton, const char *key, const char *value)
|
|
{
|
|
serf_bucket_t *hdrs_bkt = (serf_bucket_t *)vbaton;
|
|
|
|
/* XXXXX: List of headers not to copy to serf. serf's serf_bucket_headers_setn,
|
|
* doesn't actually overwrite a header if we set it once, so we need to ignore anything
|
|
* we might want to toggle or combine.
|
|
*/
|
|
switch (key[0]) {
|
|
case 'a':
|
|
case 'A':
|
|
if (ap_cstr_casecmp("Accept-Encoding", key) == 0) {
|
|
return 0;
|
|
}
|
|
break;
|
|
case 'c':
|
|
case 'C':
|
|
if (ap_cstr_casecmp("Connection", key) == 0) {
|
|
return 0;
|
|
}
|
|
break;
|
|
case 'h':
|
|
case 'H':
|
|
if (ap_cstr_casecmp("Host", key) == 0) {
|
|
return 0;
|
|
}
|
|
break;
|
|
case 'k':
|
|
case 'K':
|
|
if (ap_cstr_casecmp("Keep-Alive", key) == 0) {
|
|
return 0;
|
|
}
|
|
break;
|
|
case 't':
|
|
case 'T':
|
|
if (ap_cstr_casecmp("TE", key) == 0) {
|
|
return 0;
|
|
}
|
|
if (ap_cstr_casecmp("Trailer", key) == 0) {
|
|
return 0;
|
|
}
|
|
break;
|
|
case 'u':
|
|
case 'U':
|
|
if (ap_cstr_casecmp("Upgrade", key) == 0) {
|
|
return 0;
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
serf_bucket_headers_setn(hdrs_bkt, key, value);
|
|
return 0;
|
|
}
|
|
|
|
static int copy_headers_out(void *vbaton, const char *key, const char *value)
|
|
{
|
|
s_baton_t *ctx = vbaton;
|
|
int done = 0;
|
|
|
|
/* XXXXX: Special Treatment required for MANY other headers. fixme.*/
|
|
switch (key[0]) {
|
|
case 'c':
|
|
case 'C':
|
|
if (ap_cstr_casecmp("Content-Type", key) == 0) {
|
|
ap_set_content_type(ctx->r, value);
|
|
done = 1;
|
|
break;
|
|
}
|
|
else if (ap_cstr_casecmp("Connection", key) == 0) {
|
|
done = 1;
|
|
break;
|
|
}
|
|
else if (ap_cstr_casecmp("Content-Encoding", key) == 0) {
|
|
done = 1;
|
|
break;
|
|
}
|
|
else if (ap_cstr_casecmp("Content-Length", key) == 0) {
|
|
done = 1;
|
|
break;
|
|
}
|
|
break;
|
|
case 't':
|
|
case 'T':
|
|
if (ap_cstr_casecmp("Transfer-Encoding", key) == 0) {
|
|
done = 1;
|
|
break;
|
|
}
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if (!done) {
|
|
apr_table_addn(ctx->r->headers_out, key, value);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static serf_bucket_t* accept_response(serf_request_t *request,
|
|
serf_bucket_t *stream,
|
|
void *acceptor_baton,
|
|
apr_pool_t *pool)
|
|
{
|
|
serf_bucket_t *c;
|
|
serf_bucket_alloc_t *bkt_alloc;
|
|
|
|
/* get the per-request bucket allocator */
|
|
bkt_alloc = serf_request_get_alloc(request);
|
|
|
|
/* Create a barrier so the response doesn't eat us! */
|
|
c = serf_bucket_barrier_create(stream, bkt_alloc);
|
|
|
|
return serf_bucket_response_create(c, bkt_alloc);
|
|
}
|
|
|
|
static apr_status_t handle_response(serf_request_t *request,
|
|
serf_bucket_t *response,
|
|
void *vbaton,
|
|
apr_pool_t *pool)
|
|
{
|
|
apr_status_t rv;
|
|
s_baton_t *ctx = vbaton;
|
|
const char *data;
|
|
apr_size_t len;
|
|
serf_status_line sl;
|
|
|
|
if (response == NULL) {
|
|
ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
|
|
return APR_EGENERAL;
|
|
}
|
|
|
|
/* XXXXXXX: Create better error message. */
|
|
rv = serf_bucket_response_status(response, &sl);
|
|
if (rv) {
|
|
if (APR_STATUS_IS_EAGAIN(rv)) {
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, ctx->r, APLOGNO(01121) "serf_bucket_response_status...");
|
|
|
|
ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
|
|
|
|
if (mpm_supprts_serf) {
|
|
ap_mpm_register_timed_callback(apr_time_from_msec(1),
|
|
timed_cleanup_callback, ctx);
|
|
}
|
|
|
|
return rv;
|
|
}
|
|
|
|
/**
|
|
* XXXXX: If I understood serf buckets better, it might be possible to not
|
|
* copy all of the data here, and better stream it to the client.
|
|
**/
|
|
|
|
do {
|
|
apr_brigade_cleanup(ctx->tmpbb);
|
|
rv = serf_bucket_read(response, AP_IOBUFSIZE, &data, &len);
|
|
|
|
if (SERF_BUCKET_READ_ERROR(rv)) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, ctx->r, APLOGNO(01122) "serf_bucket_read(response)");
|
|
return rv;
|
|
}
|
|
|
|
if (!ctx->done_headers) {
|
|
serf_bucket_t *hdrs;
|
|
serf_status_line line;
|
|
|
|
/* TODO: improve */
|
|
serf_bucket_response_status(response, &line);
|
|
ctx->r->status = line.code;
|
|
|
|
hdrs = serf_bucket_response_get_headers(response);
|
|
serf_bucket_headers_do(hdrs, copy_headers_out, ctx);
|
|
ctx->done_headers = 1;
|
|
}
|
|
|
|
|
|
if (len > 0) {
|
|
/* TODO: make APR bucket <-> serf bucket stuff more magical. */
|
|
apr_brigade_write(ctx->tmpbb, NULL, NULL, data, len);
|
|
}
|
|
|
|
if (APR_STATUS_IS_EOF(rv)) {
|
|
ctx->keep_reading = 0;
|
|
|
|
ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
|
|
|
|
if (mpm_supprts_serf) {
|
|
ap_mpm_register_timed_callback(apr_time_from_msec(1),
|
|
timed_cleanup_callback, ctx);
|
|
}
|
|
return APR_EOF;
|
|
}
|
|
|
|
ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
|
|
|
|
/* XXXX: Should we send a flush now? */
|
|
if (APR_STATUS_IS_EAGAIN(rv)) {
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
} while (1);
|
|
}
|
|
|
|
|
|
static apr_status_t setup_request(serf_request_t *request,
|
|
void *vbaton,
|
|
serf_bucket_t **req_bkt,
|
|
serf_response_acceptor_t *acceptor,
|
|
void **acceptor_baton,
|
|
serf_response_handler_t *handler,
|
|
void **handler_baton,
|
|
apr_pool_t *pool)
|
|
{
|
|
s_baton_t *ctx = vbaton;
|
|
serf_bucket_t *hdrs_bkt;
|
|
|
|
*req_bkt = serf_bucket_request_create(ctx->r->method, ctx->r->unparsed_uri,
|
|
ctx->body_bkt,
|
|
serf_request_get_alloc(request));
|
|
|
|
hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
|
|
|
|
apr_table_do(copy_headers_in, hdrs_bkt, ctx->r->headers_in, NULL);
|
|
|
|
if (ctx->conf->preservehost) {
|
|
serf_bucket_headers_setn(hdrs_bkt, "Host",
|
|
apr_table_get(ctx->r->headers_in, "Host"));
|
|
}
|
|
else {
|
|
serf_bucket_headers_setn(hdrs_bkt, "Host", ctx->conf->url.hostname);
|
|
}
|
|
|
|
serf_bucket_headers_setn(hdrs_bkt, "Accept-Encoding", "gzip");
|
|
|
|
if (ctx->want_ssl) {
|
|
if (ctx->ssl_ctx == NULL) {
|
|
*req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, NULL,
|
|
ctx->bkt_alloc);
|
|
ctx->ssl_ctx = serf_bucket_ssl_encrypt_context_get(*req_bkt);
|
|
}
|
|
else {
|
|
*req_bkt = serf_bucket_ssl_encrypt_create(*req_bkt, ctx->ssl_ctx,
|
|
ctx->bkt_alloc);
|
|
}
|
|
}
|
|
|
|
*acceptor = accept_response;
|
|
*acceptor_baton = ctx;
|
|
*handler = handle_response;
|
|
*handler_baton = ctx;
|
|
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
/* TODO: rewrite drive_serf to make it async */
|
|
static int drive_serf(request_rec *r, serf_config_t *conf)
|
|
{
|
|
apr_status_t rv = 0;
|
|
apr_pool_t *pool;
|
|
apr_sockaddr_t *address;
|
|
s_baton_t *baton = apr_palloc(r->pool, sizeof(s_baton_t));
|
|
/* XXXXX: make persistent/per-process or something.*/
|
|
serf_context_t *serfme;
|
|
serf_connection_t *conn;
|
|
serf_server_config_t *ctx =
|
|
(serf_server_config_t *)ap_get_module_config(r->server->module_config,
|
|
&serf_module);
|
|
|
|
/* Allocate everything out of a subpool, with a shorter lifetime than
|
|
* the main request, so that we can cleanup safely and remove our events
|
|
* from the main serf context in the async mpm mode.
|
|
*/
|
|
apr_pool_create(&pool, r->pool);
|
|
apr_pool_tag(pool, "mod_serf_drive");
|
|
if (strcmp(conf->url.scheme, "cluster") == 0) {
|
|
int rc;
|
|
ap_serf_cluster_provider_t *cp;
|
|
serf_cluster_t *cluster;
|
|
apr_array_header_t *servers = NULL;
|
|
apr_uint32_t pick = 0;
|
|
ap_serf_server_t *choice;
|
|
|
|
/* TODO: could this be optimized in post-config to pre-setup the
|
|
* pointers to the right cluster inside the conf structure?
|
|
*/
|
|
cluster = apr_hash_get(ctx->clusters,
|
|
conf->url.hostname,
|
|
APR_HASH_KEY_STRING);
|
|
if (!cluster) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01123)
|
|
"SerfCluster: unable to find cluster %s", conf->url.hostname);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
cp = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
|
|
|
|
if (cp == NULL) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01124)
|
|
"SerfCluster: unable to find provider %s", cluster->provider);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
if (cp->list_servers == NULL) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01125)
|
|
"SerfCluster: %s is missing list servers provider.", cluster->provider);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
rc = cp->list_servers(cp->baton,
|
|
r,
|
|
cluster->params,
|
|
&servers);
|
|
|
|
if (rc != OK) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r, APLOGNO(01126)
|
|
"SerfCluster: %s list servers returned failure", cluster->provider);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
if (servers == NULL || apr_is_empty_array(servers)) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rc, r, APLOGNO(01127)
|
|
"SerfCluster: %s failed to provide a list of servers", cluster->provider);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
/* TODO: restructure try all servers in the array !! */
|
|
pick = ap_random_pick(0, servers->nelts-1);
|
|
choice = APR_ARRAY_IDX(servers, pick, ap_serf_server_t *);
|
|
|
|
rv = apr_sockaddr_info_get(&address, choice->ip,
|
|
APR_UNSPEC, choice->port, 0,
|
|
pool);
|
|
}
|
|
else {
|
|
/* XXXXX: cache dns? */
|
|
rv = apr_sockaddr_info_get(&address, conf->url.hostname,
|
|
APR_UNSPEC, conf->url.port, 0,
|
|
pool);
|
|
}
|
|
|
|
if (rv != APR_SUCCESS) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01128) "Unable to resolve: %s", conf->url.hostname);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
if (mpm_supprts_serf) {
|
|
serfme = ap_lookup_provider("mpm_serf", "instance", "0");
|
|
if (!serfme) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01129) "mpm lied to us about supporting serf.");
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
}
|
|
else {
|
|
serfme = serf_context_create(pool);
|
|
}
|
|
|
|
baton->r = r;
|
|
baton->conf = conf;
|
|
baton->serf_pool = pool;
|
|
baton->bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
|
|
baton->body_bkt = NULL;
|
|
baton->ssl_ctx = NULL;
|
|
baton->rstatus = OK;
|
|
|
|
baton->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
|
|
baton->done_headers = 0;
|
|
baton->keep_reading = 1;
|
|
|
|
if (ap_cstr_casecmp(conf->url.scheme, "https") == 0) {
|
|
baton->want_ssl = 1;
|
|
}
|
|
else {
|
|
baton->want_ssl = 0;
|
|
}
|
|
|
|
rv = ap_setup_client_block(baton->r, REQUEST_CHUNKED_DECHUNK);
|
|
if (rv) {
|
|
return rv;
|
|
}
|
|
|
|
/* TODO: create custom serf bucket, which does async request body reads */
|
|
if (ap_should_client_block(r)) {
|
|
apr_size_t len;
|
|
apr_off_t flen = 0;
|
|
char buf[AP_IOBUFSIZE];
|
|
apr_file_t *fp;
|
|
|
|
rv = apr_file_mktemp(&fp, "mod_serf_buffer.XXXXXX", 0, pool);
|
|
if (rv != APR_SUCCESS) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01130) "Unable to create temp request body buffer file.");
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
do {
|
|
len = sizeof(buf);
|
|
/* FIXME: ap_get_client_block() returns long, not apr_status_t */
|
|
rv = ap_get_client_block(baton->r, buf, len);
|
|
if (rv > 0) {
|
|
rv = apr_file_write_full(fp, buf, rv, NULL);
|
|
if (rv) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01131) "failed to read request body");
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
}
|
|
} while(rv > 0);
|
|
|
|
if (rv < 0) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01132) "failed to read request body");
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
apr_file_seek(fp, APR_SET, &flen);
|
|
baton->body_bkt = serf_bucket_file_create(fp, baton->bkt_alloc);
|
|
}
|
|
|
|
conn = serf_connection_create(serfme, address,
|
|
conn_setup, baton,
|
|
closed_connection, baton,
|
|
pool);
|
|
|
|
/* XXX: Is it correct that we don't use the returned serf_request_t? */
|
|
serf_connection_request_create(conn, setup_request, baton);
|
|
|
|
if (mpm_supprts_serf) {
|
|
return SUSPENDED;
|
|
}
|
|
else {
|
|
do {
|
|
rv = serf_context_run(serfme, SERF_DURATION_FOREVER, pool);
|
|
|
|
/* XXXX: Handle timeouts */
|
|
if (APR_STATUS_IS_TIMEUP(rv)) {
|
|
continue;
|
|
}
|
|
|
|
if (rv != APR_SUCCESS) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01133) "serf_context_run() for %pI", address);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
serf_debug__closed_conn(baton->bkt_alloc);
|
|
} while (baton->keep_reading);
|
|
|
|
return baton->rstatus;
|
|
}
|
|
}
|
|
|
|
static int serf_handler(request_rec *r)
|
|
{
|
|
serf_config_t *conf = ap_get_module_config(r->per_dir_config,
|
|
&serf_module);
|
|
|
|
if (conf->on == 0) {
|
|
return DECLINED;
|
|
}
|
|
|
|
return drive_serf(r, conf);
|
|
}
|
|
|
|
static int is_true(const char *w)
|
|
{
|
|
if (strcasecmp(w, "on") == 0 || strcmp(w, "1") == 0 ||
|
|
strcasecmp(w, "true") == 0)
|
|
{
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
static const char *add_pass(cmd_parms *cmd, void *vconf,
|
|
int argc, char *const argv[])
|
|
{
|
|
int i;
|
|
apr_status_t rv;
|
|
serf_config_t *conf = (serf_config_t *) vconf;
|
|
|
|
if (argc < 1) {
|
|
return "SerfPass must have at least a URI.";
|
|
}
|
|
|
|
rv = apr_uri_parse(cmd->pool, argv[0], &conf->url);
|
|
|
|
if (rv != APR_SUCCESS) {
|
|
return "Unable to parse SerfPass url.";
|
|
}
|
|
|
|
if (!conf->url.scheme) {
|
|
return "Need scheme part in url.";
|
|
}
|
|
|
|
/* XXXX: These are bugs in apr_uri_parse. Fixme. */
|
|
if (!conf->url.port) {
|
|
conf->url.port = apr_uri_port_of_scheme(conf->url.scheme);
|
|
}
|
|
|
|
if (!conf->url.path) {
|
|
conf->url.path = "/";
|
|
}
|
|
|
|
for (i = 1; i < argc; i++) {
|
|
const char *p = argv[i];
|
|
const char *x = ap_strchr_c(p, '=');
|
|
|
|
if (x) {
|
|
if (strncmp(p, "preservehost", x-p) == 0) {
|
|
conf->preservehost = is_true(x+1);
|
|
}
|
|
}
|
|
}
|
|
|
|
conf->on = 1;
|
|
|
|
return NULL;
|
|
}
|
|
|
|
/* SerfCluster <name> <provider> <key=value_params_to_provider> ... */
|
|
|
|
static const char *add_cluster(cmd_parms *cmd, void *d,
|
|
int argc, char *const argv[])
|
|
{
|
|
const char *rv;
|
|
ap_serf_cluster_provider_t *backend;
|
|
int i;
|
|
serf_cluster_t *cluster = NULL;
|
|
serf_server_config_t *ctx =
|
|
(serf_server_config_t *)ap_get_module_config(cmd->server->module_config,
|
|
&serf_module);
|
|
|
|
const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
|
|
|
|
if (err != NULL) {
|
|
return err;
|
|
}
|
|
|
|
if (argc < 2) {
|
|
return "SerfCluster must have at least a name and provider.";
|
|
}
|
|
|
|
cluster = apr_palloc(cmd->pool, sizeof(serf_cluster_t));
|
|
cluster->name = apr_pstrdup(cmd->pool, argv[0]);
|
|
cluster->provider = apr_pstrdup(cmd->pool, argv[1]);
|
|
cluster->params = apr_table_make(cmd->pool, 6);
|
|
|
|
backend = ap_lookup_provider(AP_SERF_CLUSTER_PROVIDER, cluster->provider, "0");
|
|
|
|
if (backend == NULL) {
|
|
return apr_psprintf(cmd->pool, "SerfCluster: unable to find "
|
|
"provider '%s'", cluster->provider);
|
|
}
|
|
|
|
for (i = 2; i < argc; i++) {
|
|
const char *p = argv[i];
|
|
const char *x = ap_strchr_c(p, '=');
|
|
|
|
if (x && strlen(p) > 1) {
|
|
apr_table_addn(cluster->params,
|
|
apr_pstrmemdup(cmd->pool, p, x-p),
|
|
x+1);
|
|
}
|
|
else {
|
|
apr_table_addn(cluster->params,
|
|
apr_pstrdup(cmd->pool, p),
|
|
"");
|
|
}
|
|
}
|
|
|
|
if (backend->check_config == NULL) {
|
|
return apr_psprintf(cmd->pool, "SerfCluster: Provider '%s' failed to "
|
|
"provider a configuration checker",
|
|
cluster->provider);
|
|
}
|
|
|
|
rv = backend->check_config(backend->baton, cmd, cluster->params);
|
|
|
|
if (rv) {
|
|
return rv;
|
|
}
|
|
|
|
apr_hash_set(ctx->clusters, cluster->name, APR_HASH_KEY_STRING, cluster);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void *create_dir_config(apr_pool_t *p, char *dummy)
|
|
{
|
|
serf_config_t *new = (serf_config_t *) apr_pcalloc(p, sizeof(serf_config_t));
|
|
new->on = 0;
|
|
new->preservehost = 1;
|
|
return new;
|
|
}
|
|
|
|
static void *create_server_config(apr_pool_t *p, server_rec *s)
|
|
{
|
|
serf_server_config_t *ctx =
|
|
(serf_server_config_t *) apr_pcalloc(p, sizeof(serf_server_config_t));
|
|
|
|
ctx->clusters = apr_hash_make(p);
|
|
|
|
return ctx;
|
|
}
|
|
|
|
static void * merge_server_config(apr_pool_t *p, void *basev, void *overridesv)
|
|
{
|
|
serf_server_config_t *ctx = apr_pcalloc(p, sizeof(serf_server_config_t));
|
|
serf_server_config_t *base = (serf_server_config_t *) basev;
|
|
serf_server_config_t *overrides = (serf_server_config_t *) overridesv;
|
|
|
|
ctx->clusters = apr_hash_overlay(p, base->clusters, overrides->clusters);
|
|
return ctx;
|
|
}
|
|
|
|
static const command_rec serf_cmds[] =
|
|
{
|
|
AP_INIT_TAKE_ARGV("SerfCluster", add_cluster, NULL, RSRC_CONF,
|
|
"Configure a cluster backend"),
|
|
AP_INIT_TAKE_ARGV("SerfPass", add_pass, NULL, OR_INDEXES,
|
|
"URL to reverse proxy to"),
|
|
{NULL}
|
|
};
|
|
|
|
typedef struct hb_table_baton_t {
|
|
apr_pool_t *p;
|
|
const char *msg;
|
|
} hb_table_baton_t;
|
|
|
|
static int hb_table_check(void *rec, const char *key, const char *value)
|
|
{
|
|
hb_table_baton_t *b = (hb_table_baton_t*)rec;
|
|
if (strcmp(key, "path") != 0) {
|
|
b->msg = apr_psprintf(b->p,
|
|
"SerfCluster Heartbeat Invalid parameter '%s'",
|
|
key);
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static const char* hb_config_check(void *baton,
|
|
cmd_parms *cmd,
|
|
apr_table_t *params)
|
|
{
|
|
hb_table_baton_t b;
|
|
|
|
if (apr_is_empty_table(params)) {
|
|
return "SerfCluster Heartbeat requires a path to the heartbeat information.";
|
|
}
|
|
|
|
b.p = cmd->pool;
|
|
b.msg = NULL;
|
|
|
|
apr_table_do(hb_table_check, &b, params, NULL);
|
|
|
|
if (b.msg) {
|
|
return b.msg;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
typedef struct hb_server_t {
|
|
const char *ip;
|
|
int busy;
|
|
int ready;
|
|
int seen;
|
|
unsigned int port;
|
|
} hb_server_t;
|
|
|
|
static void
|
|
argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
|
|
{
|
|
char *key;
|
|
char *value;
|
|
char *strtok_state;
|
|
|
|
key = apr_strtok(str, "&", &strtok_state);
|
|
while (key) {
|
|
value = strchr(key, '=');
|
|
if (value) {
|
|
*value = '\0'; /* Split the string in two */
|
|
value++; /* Skip passed the = */
|
|
}
|
|
else {
|
|
value = "1";
|
|
}
|
|
ap_unescape_url(key);
|
|
ap_unescape_url(value);
|
|
apr_table_set(parms, key, value);
|
|
key = apr_strtok(NULL, "&", &strtok_state);
|
|
}
|
|
}
|
|
|
|
static apr_status_t read_heartbeats(const char *path,
|
|
apr_array_header_t *servers,
|
|
apr_pool_t *pool)
|
|
{
|
|
apr_finfo_t fi;
|
|
apr_status_t rv;
|
|
apr_file_t *fp;
|
|
|
|
if (!path) {
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
|
|
APR_OS_DEFAULT, pool);
|
|
|
|
if (rv) {
|
|
return rv;
|
|
}
|
|
|
|
rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
|
|
|
|
if (rv) {
|
|
return rv;
|
|
}
|
|
|
|
{
|
|
char *t;
|
|
int lineno = 0;
|
|
apr_table_t *hbt = apr_table_make(pool, 10);
|
|
char buf[4096];
|
|
|
|
while (apr_file_gets(buf, sizeof(buf), fp) == APR_SUCCESS) {
|
|
hb_server_t *server;
|
|
const char *ip;
|
|
lineno++;
|
|
|
|
/* comment */
|
|
if (buf[0] == '#') {
|
|
continue;
|
|
}
|
|
|
|
|
|
/* line format: <IP> <query_string>\n */
|
|
t = strchr(buf, ' ');
|
|
if (!t) {
|
|
continue;
|
|
}
|
|
|
|
ip = apr_pstrmemdup(pool, buf, t - buf);
|
|
t++;
|
|
server = apr_pcalloc(pool, sizeof(hb_server_t));
|
|
server->ip = ip;
|
|
server->port = 80;
|
|
server->seen = -1;
|
|
apr_table_clear(hbt);
|
|
|
|
argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
|
|
|
|
if (apr_table_get(hbt, "busy")) {
|
|
server->busy = atoi(apr_table_get(hbt, "busy"));
|
|
}
|
|
|
|
if (apr_table_get(hbt, "ready")) {
|
|
server->ready = atoi(apr_table_get(hbt, "ready"));
|
|
}
|
|
|
|
if (apr_table_get(hbt, "lastseen")) {
|
|
server->seen = atoi(apr_table_get(hbt, "lastseen"));
|
|
}
|
|
|
|
if (apr_table_get(hbt, "port")) {
|
|
server->port = atoi(apr_table_get(hbt, "port"));
|
|
}
|
|
|
|
if (server->busy == 0 && server->ready != 0) {
|
|
/* Server has zero threads active, but lots of them ready,
|
|
* it likely just started up, so lets /4 the number ready,
|
|
* to prevent us from completely flooding it with all new
|
|
* requests.
|
|
*/
|
|
server->ready = server->ready / 4;
|
|
}
|
|
|
|
APR_ARRAY_PUSH(servers, hb_server_t *) = server;
|
|
}
|
|
}
|
|
|
|
return APR_SUCCESS;
|
|
}
|
|
|
|
static int hb_server_sort(const void *a_, const void *b_)
|
|
{
|
|
hb_server_t *a = (hb_server_t*)a_;
|
|
hb_server_t *b = (hb_server_t*)b_;
|
|
if (a->ready == b->ready) {
|
|
return 0;
|
|
}
|
|
else if (a->ready > b->ready) {
|
|
return -1;
|
|
}
|
|
else {
|
|
return 1;
|
|
}
|
|
}
|
|
|
|
static int hb_list_servers(void *baton,
|
|
request_rec *r,
|
|
apr_table_t *params,
|
|
apr_array_header_t **out_servers)
|
|
{
|
|
int i;
|
|
hb_server_t *hbs;
|
|
apr_status_t rv;
|
|
apr_pool_t *tpool;
|
|
apr_array_header_t *tmpservers;
|
|
apr_array_header_t *servers;
|
|
const char *path = apr_table_get(params, "path");
|
|
|
|
apr_pool_create(&tpool, r->pool);
|
|
apr_pool_tag(tpool, "mod_serf_hb");
|
|
|
|
path = ap_server_root_relative(tpool, path);
|
|
|
|
tmpservers = apr_array_make(tpool, 32, sizeof(hb_server_t *));
|
|
rv = read_heartbeats(path, tmpservers, tpool);
|
|
|
|
if (rv) {
|
|
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01134)
|
|
"SerfCluster: Heartbeat unable to read '%s'", path);
|
|
apr_pool_destroy(tpool);
|
|
return HTTP_INTERNAL_SERVER_ERROR;
|
|
}
|
|
|
|
qsort(tmpservers->elts, tmpservers->nelts, sizeof(hb_server_t *),
|
|
hb_server_sort);
|
|
|
|
servers = apr_array_make(r->pool, tmpservers->nelts, sizeof(ap_serf_server_t *));
|
|
for (i = 0;
|
|
i < tmpservers->nelts;
|
|
i++)
|
|
{
|
|
ap_serf_server_t *x;
|
|
|
|
hbs = APR_ARRAY_IDX(tmpservers, i, hb_server_t *);
|
|
if (hbs->ready > 0) {
|
|
x = apr_palloc(r->pool, sizeof(ap_serf_server_t));
|
|
x->ip = apr_pstrdup(r->pool, hbs->ip);
|
|
x->port = hbs->port;
|
|
APR_ARRAY_PUSH(servers, ap_serf_server_t *) = x;
|
|
}
|
|
}
|
|
|
|
*out_servers = servers;
|
|
apr_pool_destroy(tpool);
|
|
return OK;
|
|
}
|
|
|
|
static const ap_serf_cluster_provider_t builtin_heartbeat =
|
|
{
|
|
"heartbeat",
|
|
NULL,
|
|
&hb_config_check,
|
|
&hb_list_servers,
|
|
NULL,
|
|
NULL
|
|
};
|
|
|
|
static int static_table_check(void *rec, const char *key, const char *value)
|
|
{
|
|
hb_table_baton_t *b = (hb_table_baton_t*)rec;
|
|
if (strcmp(key, "hosts") != 0 &&
|
|
strcmp(key, "order") != 0) {
|
|
b->msg = apr_psprintf(b->p,
|
|
"SerfCluster Static Invalid parameter '%s'",
|
|
key);
|
|
return 1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static const char* static_config_check(void *baton,
|
|
cmd_parms *cmd,
|
|
apr_table_t *params)
|
|
{
|
|
hb_table_baton_t b;
|
|
|
|
if (apr_is_empty_table(params)) {
|
|
return "SerfCluster Static requires at least a host list.";
|
|
}
|
|
|
|
b.p = cmd->pool;
|
|
b.msg = NULL;
|
|
|
|
apr_table_do(static_table_check, &b, params, NULL);
|
|
|
|
if (b.msg) {
|
|
return b.msg;
|
|
}
|
|
|
|
if (apr_table_get(params, "hosts") == NULL) {
|
|
return "SerfCluster Static requires at least a hosts parameter";
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static int static_list_servers(void *baton,
|
|
request_rec *r,
|
|
apr_table_t *params,
|
|
apr_array_header_t **out_servers)
|
|
{
|
|
apr_status_t rv;
|
|
char *ip;
|
|
char *strtok_state;
|
|
apr_array_header_t *servers;
|
|
const char *hosts = apr_table_get(params, "hosts");
|
|
const char *order = apr_table_get(params, "order");
|
|
|
|
servers = apr_array_make(r->pool, 10, sizeof(ap_serf_server_t *));
|
|
|
|
ip = apr_strtok(apr_pstrdup(r->pool, hosts), ",", &strtok_state);
|
|
while (ip) {
|
|
char *host_str;
|
|
char *scope_id;
|
|
apr_port_t port = 0;
|
|
|
|
rv = apr_parse_addr_port(&host_str, &scope_id, &port, ip, r->pool);
|
|
if (!rv) {
|
|
ap_serf_server_t *s = apr_palloc(r->pool, sizeof(ap_serf_server_t));
|
|
s->ip = host_str;
|
|
s->port = port ? port : 80;
|
|
APR_ARRAY_PUSH(servers, ap_serf_server_t *) = s;
|
|
}
|
|
ip = apr_strtok(NULL, ",", &strtok_state);
|
|
}
|
|
|
|
if (strcmp(order, "random") == 0) {
|
|
/* TODO: support order=random */
|
|
}
|
|
|
|
*out_servers = servers;
|
|
|
|
return OK;
|
|
}
|
|
|
|
static const ap_serf_cluster_provider_t builtin_static =
|
|
{
|
|
"static",
|
|
NULL,
|
|
&static_config_check,
|
|
&static_list_servers,
|
|
NULL,
|
|
NULL
|
|
};
|
|
|
|
static int serf_post_config(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp, server_rec *s)
|
|
{
|
|
apr_status_t rv;
|
|
rv = ap_mpm_query(AP_MPMQ_HAS_SERF, &mpm_supprts_serf);
|
|
|
|
if (rv != APR_SUCCESS) {
|
|
mpm_supprts_serf = 0;
|
|
}
|
|
|
|
return OK;
|
|
}
|
|
|
|
static void register_hooks(apr_pool_t *p)
|
|
{
|
|
ap_register_provider(p, AP_SERF_CLUSTER_PROVIDER,
|
|
"heartbeat", "0", &builtin_heartbeat);
|
|
|
|
ap_register_provider(p, AP_SERF_CLUSTER_PROVIDER,
|
|
"static", "0", &builtin_static);
|
|
|
|
ap_hook_post_config(serf_post_config, NULL, NULL, APR_HOOK_MIDDLE);
|
|
|
|
ap_hook_handler(serf_handler, NULL, NULL, APR_HOOK_FIRST);
|
|
}
|
|
|
|
AP_DECLARE_MODULE(serf) =
|
|
{
|
|
STANDARD20_MODULE_STUFF,
|
|
create_dir_config,
|
|
NULL,
|
|
create_server_config,
|
|
merge_server_config,
|
|
serf_cmds,
|
|
register_hooks
|
|
};
|