diff options
Diffstat (limited to 'src/bitz')
-rw-r--r-- | src/bitz/config.cpp | 11 | ||||
-rw-r--r-- | src/bitz/config.h | 1 | ||||
-rw-r--r-- | src/bitz/manager.cpp | 40 | ||||
-rw-r--r-- | src/bitz/manager.h | 14 | ||||
-rw-r--r-- | src/bitz/options_request_handler.cpp | 2 | ||||
-rw-r--r-- | src/bitz/options_request_handler.h | 2 | ||||
-rw-r--r-- | src/bitz/request_handler.cpp | 6 | ||||
-rw-r--r-- | src/bitz/request_handler.h | 8 | ||||
-rw-r--r-- | src/bitz/worker.cpp | 139 | ||||
-rw-r--r-- | src/bitz/worker.h | 6 |
10 files changed, 155 insertions, 74 deletions
diff --git a/src/bitz/config.cpp b/src/bitz/config.cpp index a9ca856..aaa247f 100644 --- a/src/bitz/config.cpp +++ b/src/bitz/config.cpp @@ -39,6 +39,7 @@ namespace bitz { _config.max_workers = 0; _config.max_worker_requests = 0; + _config.comm_timeout = 0; // defaults _lconfig = NULL; @@ -70,11 +71,11 @@ namespace bitz { try { config->readFile( config_file.c_str() ); - } catch( const libconfig::FileIOException &ex ) { + } catch ( const libconfig::FileIOException &ex ) { std::cerr << "[config] failed to read config file: " << config_file << ", exception: " << ex.what() << std::endl; exit( EXIT_FAILURE ); - } catch( const libconfig::ParseException &pex ) { + } catch ( const libconfig::ParseException &pex ) { std::cerr << "[config] parse error at " << pex.getFile() << ":" << pex.getLine() << " - " << pex.getError() << std::endl; exit( EXIT_FAILURE ); @@ -91,7 +92,9 @@ namespace bitz { config->lookupValue( "max_workers", _config.max_workers ); config->lookupValue( "max_worker_requests", _config.max_worker_requests ); - } catch( const libconfig::SettingNotFoundException &e ) { + config->lookupValue( "comm_timeout", _config.comm_timeout ); + + } catch ( const libconfig::SettingNotFoundException &e ) { std::cerr << "[config] failed to load core configs, " << e.getPath() << " : " << e.what() << std::endl; } @@ -121,7 +124,7 @@ namespace bitz { try { libconfig::Setting &setting = _lconfig->lookup( std::string( "modules." ).append( module ) ); setting.lookupValue( config, config_value ); - } catch( const libconfig::SettingNotFoundException &e ) { + } catch ( const libconfig::SettingNotFoundException &e ) { // TODO: log errors ?? std::cerr << "[config] " << e.getPath() << " : " << e.what() << std::endl; } diff --git a/src/bitz/config.h b/src/bitz/config.h index 453d29d..021384b 100644 --- a/src/bitz/config.h +++ b/src/bitz/config.h @@ -47,6 +47,7 @@ namespace bitz { int port; int max_workers; int max_worker_requests; + int comm_timeout; std::string pid_file; std::string log_file; std::string log_category; diff --git a/src/bitz/manager.cpp b/src/bitz/manager.cpp index ffa2dae..14033a5 100644 --- a/src/bitz/manager.cpp +++ b/src/bitz/manager.cpp @@ -31,8 +31,11 @@ namespace bitz { Manager::Manager( unsigned short port, const std::string &address, int backlog ) throw( ManagerException ) { // initialise manager - _manager.worker = false; - _manager.max_workers = 0; + _manager.worker = false; + _manager.max_workers = 0; + _manager.max_worker_requests = 0; + _manager.comm_timeout = 0; + _manager.workers_count = 0; _manager.worker_id = 0; _manager.socket = NULL; @@ -40,13 +43,16 @@ namespace bitz { // initialise listening socket try { - if ( address.empty() ) { - _manager.socket = new socketlibrary::TCPServerSocket( port, backlog ); - } else { - _manager.socket = new socketlibrary::TCPServerSocket( address, port, backlog ); - } - } catch ( socketlibrary::SocketException &sex ) { - throw ManagerException( "failed to initialise socket" ); + + // network socket address + psocksxx::nsockaddr naddr( address.c_str(), port ); + + _manager.socket = new psocksxx::tcpnsockstream(); + _manager.socket->bind( &naddr, true ); + _manager.socket->listen( backlog ); + + } catch ( psocksxx::sockexception &e ) { + throw ManagerException( std::string( "failed to initialise socket, " ).append( e.what() ) ); } Logger &logger = Logger::instance(); @@ -71,10 +77,11 @@ namespace bitz { } - void Manager::spawn( unsigned int max_workers, unsigned int max_worker_requests ) throw( ManagerException ) { + void Manager::spawn( unsigned int max_workers, unsigned int max_worker_requests, unsigned int comm_timeout ) throw( ManagerException ) { _manager.max_workers = max_workers; _manager.max_worker_requests = max_worker_requests; + _manager.comm_timeout = comm_timeout; _manager.worker_pool = new worker_pool_t[max_workers]; // pre-fork workers @@ -114,7 +121,7 @@ namespace bitz { _manager.worker_pool[worker_id].worker_id = worker_id; _manager.worker_pool[worker_id].worker_pid = worker_pid; - _manager.worker_pool[worker_id].worker->run( _manager.socket, _manager.max_worker_requests ); + _manager.worker_pool[worker_id].worker->run( _manager.socket, _manager.max_worker_requests, _manager.comm_timeout ); logger.info( std::string( "end of cycle, worker[" ).append( util::itoa( worker_id ) ).append( "]" ) ); delete _manager.worker_pool[worker_id].worker; @@ -200,6 +207,9 @@ namespace bitz { void Manager::manager_workers() throw() { + // logger + Logger &logger = Logger::instance(); + if (! _manager.worker ) { // check the worker count @@ -208,8 +218,12 @@ namespace bitz { // we are missing workers, find out who for (unsigned int i = 0; i < _manager.max_workers; i++ ) { if ( _manager.worker_pool[i].worker_pid == 0 ) { - // spawn a worker for the missing - spawn_worker( i ); + try { + // spawn a worker for the missing + spawn_worker( i ); + } catch ( ManagerException &mex ) { + logger.warn( std::string( "[manager] failed to spawn worker[" ).append( util::itoa( i ) ).append( "], exception: ").append( mex.what() ) ); + } } } diff --git a/src/bitz/manager.h b/src/bitz/manager.h index cc15f9c..ce49e9e 100644 --- a/src/bitz/manager.h +++ b/src/bitz/manager.h @@ -20,8 +20,8 @@ #ifndef BITZ_MANAGER_H #define BITZ_MANAGER_H -#include <unistd.h> // pid_t, fork() etc. -#include <socket/socket.h> // socket-library +#include <unistd.h> +#include <psocksxx/tcpnsockstream.h> #include "manager_exception.h" #include "worker.h" @@ -50,10 +50,11 @@ namespace bitz { bool worker; unsigned int max_workers; unsigned int max_worker_requests; + unsigned int comm_timeout; unsigned int workers_count; unsigned int worker_id; - socketlibrary::TCPServerSocket * socket; + psocksxx::tcpnsockstream * socket; worker_pool_t * worker_pool; }; @@ -61,14 +62,17 @@ namespace bitz { /** * Note: backlog = SOMAXCONN (from sys/socket.h) */ - Manager( unsigned short port, const std::string &address = "", int backlog = 128 ) throw( ManagerException ); + Manager( unsigned short port, const std::string &address = "0.0.0.0", int backlog = 128 ) throw( ManagerException ); /** * deconstructor */ virtual ~Manager(); - virtual void spawn( unsigned int max_workers = BITZ_MAX_WORKERS, unsigned int max_worker_requests = BITZ_MAX_WORKER_REQUESTS ) throw( ManagerException ); + virtual void spawn( unsigned int max_workers = BITZ_MAX_WORKERS, + unsigned int max_worker_requests = BITZ_MAX_WORKER_REQUESTS, + unsigned int comm_timeout = 0 ) throw( ManagerException ); + virtual void shutdown( bool graceful = true ) throw(); virtual void reap_worker( pid_t worker_pid ) throw(); virtual void manager_workers() throw(); diff --git a/src/bitz/options_request_handler.cpp b/src/bitz/options_request_handler.cpp index 6872b5c..81def32 100644 --- a/src/bitz/options_request_handler.cpp +++ b/src/bitz/options_request_handler.cpp @@ -26,7 +26,7 @@ namespace bitz { OptionsRequestHandler::~OptionsRequestHandler() { } - icap::Response * OptionsRequestHandler::process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw() { + icap::Response * OptionsRequestHandler::process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw() { icap::ResponseHeader * header; icap::Response * response; diff --git a/src/bitz/options_request_handler.h b/src/bitz/options_request_handler.h index a81377f..136d111 100644 --- a/src/bitz/options_request_handler.h +++ b/src/bitz/options_request_handler.h @@ -30,7 +30,7 @@ namespace bitz { OptionsRequestHandler(); virtual ~OptionsRequestHandler(); - icap::Response * process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw(); + icap::Response * process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw(); /** * Register a request handler so it is known to the OPTIONS diff --git a/src/bitz/request_handler.cpp b/src/bitz/request_handler.cpp index 3665cba..56f9bc0 100644 --- a/src/bitz/request_handler.cpp +++ b/src/bitz/request_handler.cpp @@ -63,7 +63,7 @@ namespace bitz { } - icap::Response * RequestHandler::process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw() { + icap::Response * RequestHandler::process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw() { icap::Request * request; @@ -238,7 +238,7 @@ namespace bitz { } - icap::Response * RequestHandler::process_preview( icap::Request * request, socketlibrary::TCPSocket * socket ) throw() { + icap::Response * RequestHandler::process_preview( icap::Request * request, psocksxx::iosockstream * socket ) throw() { icap::Response * response = NULL; Modifier * modifier; @@ -365,7 +365,7 @@ namespace bitz { } - bool RequestHandler::preview_continue( icap::Response * response, icap::Request * request, socketlibrary::TCPSocket * socket ) throw() { + bool RequestHandler::preview_continue( icap::Response * response, icap::Request * request, psocksxx::iosockstream * socket ) throw() { bool status = false; diff --git a/src/bitz/request_handler.h b/src/bitz/request_handler.h index 3fe99f1..2c31090 100644 --- a/src/bitz/request_handler.h +++ b/src/bitz/request_handler.h @@ -24,7 +24,7 @@ #include <icap/response.h> #include <icap/request.h> -#include <socket/socket.h> +#include <psocksxx/iosockstream.h> namespace bitz { @@ -58,7 +58,7 @@ namespace bitz { * @param socket socket object to read the data from * @return response object */ - virtual icap::Response * process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw(); + virtual icap::Response * process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw(); protected: @@ -101,7 +101,7 @@ namespace bitz { * @param socket socket object to read data from * @return preview response (response object) */ - icap::Response * process_preview( icap::Request * request, socketlibrary::TCPSocket * socket ) throw(); + icap::Response * process_preview( icap::Request * request, psocksxx::iosockstream * socket ) throw(); /** * This method will use the loaded handler modules to get a response to the request. @@ -119,7 +119,7 @@ namespace bitz { * @param socket socket object to read / write data * @return */ - bool preview_continue( icap::Response * response, icap::Request * request, socketlibrary::TCPSocket * socket ) throw(); + bool preview_continue( icap::Response * response, icap::Request * request, psocksxx::iosockstream * socket ) throw(); private: diff --git a/src/bitz/worker.cpp b/src/bitz/worker.cpp index efbeef3..42fb3d0 100644 --- a/src/bitz/worker.cpp +++ b/src/bitz/worker.cpp @@ -51,65 +51,51 @@ namespace bitz { } - void Worker::run( socketlibrary::TCPServerSocket * server_sock, unsigned int max_requests ) throw() { + void Worker::run( psocksxx::tcpnsockstream * server_sock, unsigned int max_requests, + unsigned int comm_timeout ) throw() { + // logger Logger &logger = Logger::instance(); - socketlibrary::TCPSocket * client_sock; - icap::RequestHeader * req_header; - icap::Response * response; - RequestHandler * req_handler; - + // client socket stream + psocksxx::nsockstream * client_sock; - try { + while ( max_requests > 0 ) { - while ( max_requests > 0 ) { + logger.debug( std::string( "[worker] waiting for a connection" ) ); - logger.debug( std::string( "[worker] waiting for a connection" ) ); + try { + // accept a client connection client_sock = server_sock->accept(); - logger.debug( std::string( "[worker] new connection accepted on " ).append( client_sock->getForeignAddress() ) - .append( ":" ).append( util::itoa( client_sock->getForeignPort() ) ) ); - - // request header - req_header = icap::util::read_req_header( client_sock ); - logger.debug( std::string( "[worker] request header:\r\n" ).append( req_header->raw_data() ) ); - - // try to find a handler for the request - req_handler = util::find_req_handler( _req_handlers, req_header->method() ); - - if ( req_handler != NULL ) { - logger.debug( std::string( "[worker] handling request: " ).append( req_header->method() ) ); - - // process the request and grab the response - response = req_handler->process( req_header, client_sock ); - - } else { - - // unsupported request - logger.info( std::string( "[worker] unsupported request: " ).append( req_header->method() ) ); - response = new icap::Response( new icap::ResponseHeader( icap::ResponseHeader::NOT_ALLOWED ) ); + // FIXME: log accepted client details (e.g. address, port etc.) + logger.debug( std::string( "[worker] new connection accepted on [...]" ) ); + // set timeout value + if ( comm_timeout > 0 ) { + client_sock->timeout( comm_timeout, 0 ); + // TODO: add debug logging } - // send the response back to the client - icap::util::send_response( response, client_sock ); + } catch ( psocksxx::sockexception &e ) { - // cleanup - delete response; - delete req_header; - - // destroy / close connection - delete client_sock; + // failed to accept client connection + logger.error( std::string( "[worker] failed to accept connection: " ).append( e.what() ) ); + // update request count max_requests--; + continue; } - } catch( socketlibrary::SocketException &sex ) { - logger.error( std::string( "[worker] ERROR: " ).append( sex.what() ) ); - } + // handle client request(s) + max_requests = serve_client( client_sock, max_requests ); + + // destroy / close connection + delete client_sock; + + }; } @@ -134,5 +120,76 @@ namespace bitz { } + + unsigned int Worker::serve_client( psocksxx::nsockstream * client_sock, unsigned int max_requests ) throw() { + + // logger + Logger &logger = Logger::instance(); + + icap::RequestHeader * req_header = NULL; + icap::Response * response; + RequestHandler * req_handler; + + + do { + + // cleanup request header + if ( req_header != NULL ) { + delete req_header; + } + + // request header + req_header = icap::util::read_req_header( client_sock ); + logger.debug( std::string( "[worker] request header:\r\n" ).append( req_header->raw_data() ) ); + + // check timeout + if ( client_sock->timedout() ) { + logger.warn( "[worker] communication timed out..." ); + return --max_requests; + } + + // try to find a handler for the request + req_handler = util::find_req_handler( _req_handlers, req_header->method() ); + + // sanity check + if ( req_handler != NULL ) { + + logger.debug( std::string( "[worker] handling request: " ).append( req_header->method() ) ); + + // process the request and grab the response + response = req_handler->process( req_header, client_sock ); + + } else { + + // unsupported request + logger.info( std::string( "[worker] unsupported request: " ).append( req_header->method() ) ); + response = new icap::Response( new icap::ResponseHeader( icap::ResponseHeader::NOT_ALLOWED ) ); + + } + + // FIXME: this should be configurable and should default to close + // connection if the client doesn't send a connection header + if ( req_header->value( "Connection" ) == "keep-alive" ) { + response->header()->attach( "Connection" , "keep-alive" ); + } + + // send the response back to the client + icap::util::send_response( response, client_sock ); + + // cleanup + delete response; + + } while ( ( --max_requests > 0 ) && ( req_header->value( "Connection" ) == "keep-alive" ) ); + + + // cleanup request header + if ( req_header != NULL ) { + delete req_header; + } + + return max_requests; + + } + } /* end of namespace bitz */ diff --git a/src/bitz/worker.h b/src/bitz/worker.h index 69edbca..25b7bac 100644 --- a/src/bitz/worker.h +++ b/src/bitz/worker.h @@ -20,7 +20,7 @@ #ifndef BITZ_WORKER_H #define BITZ_WORKER_H -#include <socket/socket.h> +#include <psocksxx/tcpnsockstream.h> #include "common.h" @@ -32,12 +32,14 @@ namespace bitz { Worker(); virtual ~Worker(); - virtual void run( socketlibrary::TCPServerSocket * server_sock, unsigned int max_requests ) throw(); + virtual void run( psocksxx::tcpnsockstream * server_sock, + unsigned int max_requests, unsigned int comm_timeout = 0 ) throw(); private: req_handlers_t _req_handlers; virtual void load_req_handlers() throw(); + virtual unsigned int serve_client( psocksxx::nsockstream * client_sock, unsigned int max_requests ) throw(); }; |