Work in Progress.

Convert the 100ms timed callback to a single cleanup callback that is added
when the request is ready to finish. Basically works, though it has
some issues with flushing and closing the connection.


git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@759460 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Paul Querna
2009-03-28 13:24:18 +00:00
parent c9c5ae3ce6
commit a51ec65a24

View File

@ -20,6 +20,7 @@
#include "http_core.h"
#include "http_config.h"
#include "http_protocol.h"
#include "http_request.h"
#include "http_log.h"
#include "serf.h"
@ -53,12 +54,41 @@ typedef struct {
int done_headers;
int keep_reading;
request_rec *r;
apr_pool_t *serf_pool;
apr_bucket_brigade *tmpbb;
serf_config_t *conf;
serf_ssl_context_t *ssl_ctx;
serf_bucket_alloc_t *bkt_alloc;
} s_baton_t;
/**
* This works right now because all timers are invoked in the single listener
* thread in the Event MPM -- the same thread that serf callbacks are made
* from, so we don't technically need a mutex yet, but with the Simple MPM,
* invocations are made from worker threads, and we need to figure out locking
*/
static void timed_cleanup_callback(void *baton)
{
s_baton_t *ctx = baton;
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: timed_cleanup_callback");
/* Causes all serf connections to unregister from the event mpm */
apr_pool_destroy(ctx->serf_pool);
if (ctx->rstatus) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r,
"serf: request returned: %d", ctx->rstatus);
ctx->r->status = HTTP_OK;
ap_die(ctx->rstatus, ctx->r);
}
else {
ap_finalize_request_protocol(ctx->r);
ap_process_request_after_handler(ctx->r);
return;
}
}
static void closed_connection(serf_connection_t *conn,
void *closed_baton,
apr_status_t why,
@ -66,12 +96,18 @@ static void closed_connection(serf_connection_t *conn,
{
s_baton_t *ctx = closed_baton;
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: closed_connection");
if (why) {
/* justin says that error handling isn't done yet. hah. */
/* XXXXXX: review */
ap_log_rerror(APLOG_MARK, APLOG_ERR, why, ctx->r, "Closed Connection Error");
ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
return;
}
if (mpm_supprts_serf) {
ap_mpm_register_timed_callback(apr_time_from_msec(1),
timed_cleanup_callback, ctx);
}
}
@ -82,6 +118,8 @@ static serf_bucket_t* conn_setup(apr_socket_t *sock,
serf_bucket_t *c;
s_baton_t *ctx = setup_baton;
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: conn_setup ");
c = serf_bucket_socket_create(sock, ctx->bkt_alloc);
if (ctx->want_ssl) {
c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
@ -196,9 +234,12 @@ static serf_bucket_t* accept_response(serf_request_t *request,
void *acceptor_baton,
apr_pool_t *pool)
{
s_baton_t *ctx = acceptor_baton;
serf_bucket_t *c;
serf_bucket_alloc_t *bkt_alloc;
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: accept_response");
/* get the per-request bucket allocator */
bkt_alloc = serf_request_get_alloc(request);
@ -219,6 +260,8 @@ static apr_status_t handle_response(serf_request_t *request,
apr_size_t len;
serf_status_line sl;
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: handle_response");
/* XXXXXXX: Create better error message. */
rv = serf_bucket_response_status(response, &sl);
if (rv) {
@ -230,6 +273,11 @@ static apr_status_t handle_response(serf_request_t *request,
ctx->rstatus = HTTP_INTERNAL_SERVER_ERROR;
if (mpm_supprts_serf) {
ap_mpm_register_timed_callback(apr_time_from_msec(1),
timed_cleanup_callback, ctx);
}
return rv;
}
@ -239,6 +287,8 @@ static apr_status_t handle_response(serf_request_t *request,
**/
do {
apr_bucket *e;
apr_brigade_cleanup(ctx->tmpbb);
rv = serf_bucket_read(response, AP_IOBUFSIZE, &data, &len);
if (SERF_BUCKET_READ_ERROR(rv)) {
@ -252,15 +302,32 @@ static apr_status_t handle_response(serf_request_t *request,
serf_bucket_headers_do(hdrs, copy_headers_out, ctx);
ctx->done_headers = 1;
}
/* XXXX: write to brigades and stuff. meh */
ap_rwrite(data, len, ctx->r);
if (len > 0) {
/* TODO: make APR bucket <-> serf bucket stuff more magical. */
e = apr_bucket_immortal_create(data, len, ctx->r->connection->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
}
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: writing %"APR_SIZE_T_FMT" bytes", len);
if (APR_STATUS_IS_EOF(rv)) {
ctx->keep_reading = 0;
e = apr_bucket_flush_create(ctx->r->connection->bucket_alloc);
APR_BRIGADE_INSERT_TAIL(ctx->tmpbb, e);
ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
if (mpm_supprts_serf) {
ap_mpm_register_timed_callback(apr_time_from_msec(1),
timed_cleanup_callback, ctx);
}
return APR_EOF;
}
ctx->rstatus = ap_pass_brigade(ctx->r->output_filters, ctx->tmpbb);
/* XXXX: Should we send a flush now? */
if (APR_STATUS_IS_EAGAIN(rv)) {
return APR_SUCCESS;
@ -283,6 +350,7 @@ static apr_status_t setup_request(serf_request_t *request,
serf_bucket_t *hdrs_bkt;
serf_bucket_t *body_bkt = NULL;
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, ctx->r, "serf: setup_request");
/* XXXXX: handle incoming request bodies */
*req_bkt = serf_bucket_request_create(ctx->r->method, ctx->r->unparsed_uri, body_bkt,
@ -326,28 +394,6 @@ static apr_status_t setup_request(serf_request_t *request,
return APR_SUCCESS;
}
static void
timed_callback(void *baton)
{
s_baton_t *ctx = baton;
if (ctx->keep_reading) {
ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton);
}
else if (ctx->rstatus) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, ctx->rstatus, ctx->r,
"serf: request returned: %d", ctx->rstatus);
ctx->r->status = HTTP_OK;
ap_die(ctx->rstatus, ctx->r);
}
else {
ap_finalize_request_protocol(ctx->r);
ap_process_request_after_handler(ctx->r);
return;
}
}
#ifndef apr_time_from_msec
#define apr_time_from_msec(x) (x * 1000)
#endif
@ -356,7 +402,7 @@ timed_callback(void *baton)
static int drive_serf(request_rec *r, serf_config_t *conf)
{
apr_status_t rv;
apr_pool_t *pool = r->pool;
apr_pool_t *pool;
apr_sockaddr_t *address;
s_baton_t *baton = apr_palloc(r->pool, sizeof(s_baton_t));
/* XXXXX: make persistent/per-process or something.*/
@ -366,7 +412,12 @@ static int drive_serf(request_rec *r, serf_config_t *conf)
serf_server_config_t *ctx =
(serf_server_config_t *)ap_get_module_config(r->server->module_config,
&serf_module);
/* Allocate everything out of a subpool, with a shorter lifetime than
* the main request, so that we can cleanup safely and remove our events
* from the main serf context in the async mpm mode.
*/
apr_pool_create(&pool, r->pool);
if (strcmp(conf->url.scheme, "cluster") == 0) {
int rc;
ap_serf_cluster_provider_t *cp;
@ -449,10 +500,12 @@ static int drive_serf(request_rec *r, serf_config_t *conf)
baton->r = r;
baton->conf = conf;
baton->serf_pool = pool;
baton->bkt_alloc = serf_bucket_allocator_create(pool, NULL, NULL);
baton->ssl_ctx = NULL;
baton->rstatus = OK;
baton->tmpbb = apr_brigade_create(r->pool, r->connection->bucket_alloc);
baton->done_headers = 0;
baton->keep_reading = 1;
@ -472,14 +525,7 @@ static int drive_serf(request_rec *r, serf_config_t *conf)
baton);
if (mpm_supprts_serf) {
rv = ap_mpm_register_timed_callback(apr_time_from_msec(100), timed_callback, baton);
if (rv != APR_SUCCESS) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, "ap_mpm_register_timed_callback failed.");
return HTTP_INTERNAL_SERVER_ERROR;
}
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, baton->r, "handing off serf request to mpm");
return SUSPENDED;
}
else {