summaryrefslogtreecommitdiff
path: root/src/bitz
diff options
context:
space:
mode:
Diffstat (limited to 'src/bitz')
-rw-r--r--src/bitz/config.cpp11
-rw-r--r--src/bitz/config.h1
-rw-r--r--src/bitz/manager.cpp40
-rw-r--r--src/bitz/manager.h14
-rw-r--r--src/bitz/options_request_handler.cpp2
-rw-r--r--src/bitz/options_request_handler.h2
-rw-r--r--src/bitz/request_handler.cpp6
-rw-r--r--src/bitz/request_handler.h8
-rw-r--r--src/bitz/worker.cpp139
-rw-r--r--src/bitz/worker.h6
10 files changed, 155 insertions, 74 deletions
diff --git a/src/bitz/config.cpp b/src/bitz/config.cpp
index a9ca856..aaa247f 100644
--- a/src/bitz/config.cpp
+++ b/src/bitz/config.cpp
@@ -39,6 +39,7 @@ namespace bitz {
_config.max_workers = 0;
_config.max_worker_requests = 0;
+ _config.comm_timeout = 0;
// defaults
_lconfig = NULL;
@@ -70,11 +71,11 @@ namespace bitz {
try {
config->readFile( config_file.c_str() );
- } catch( const libconfig::FileIOException &ex ) {
+ } catch ( const libconfig::FileIOException &ex ) {
std::cerr << "[config] failed to read config file: " << config_file
<< ", exception: " << ex.what() << std::endl;
exit( EXIT_FAILURE );
- } catch( const libconfig::ParseException &pex ) {
+ } catch ( const libconfig::ParseException &pex ) {
std::cerr << "[config] parse error at " << pex.getFile()
<< ":" << pex.getLine() << " - " << pex.getError() << std::endl;
exit( EXIT_FAILURE );
@@ -91,7 +92,9 @@ namespace bitz {
config->lookupValue( "max_workers", _config.max_workers );
config->lookupValue( "max_worker_requests", _config.max_worker_requests );
- } catch( const libconfig::SettingNotFoundException &e ) {
+ config->lookupValue( "comm_timeout", _config.comm_timeout );
+
+ } catch ( const libconfig::SettingNotFoundException &e ) {
std::cerr << "[config] failed to load core configs, "
<< e.getPath() << " : " << e.what() << std::endl;
}
@@ -121,7 +124,7 @@ namespace bitz {
try {
libconfig::Setting &setting = _lconfig->lookup( std::string( "modules." ).append( module ) );
setting.lookupValue( config, config_value );
- } catch( const libconfig::SettingNotFoundException &e ) {
+ } catch ( const libconfig::SettingNotFoundException &e ) {
// TODO: log errors ??
std::cerr << "[config] " << e.getPath() << " : " << e.what() << std::endl;
}
diff --git a/src/bitz/config.h b/src/bitz/config.h
index 453d29d..021384b 100644
--- a/src/bitz/config.h
+++ b/src/bitz/config.h
@@ -47,6 +47,7 @@ namespace bitz {
int port;
int max_workers;
int max_worker_requests;
+ int comm_timeout;
std::string pid_file;
std::string log_file;
std::string log_category;
diff --git a/src/bitz/manager.cpp b/src/bitz/manager.cpp
index ffa2dae..14033a5 100644
--- a/src/bitz/manager.cpp
+++ b/src/bitz/manager.cpp
@@ -31,8 +31,11 @@ namespace bitz {
Manager::Manager( unsigned short port, const std::string &address, int backlog ) throw( ManagerException ) {
// initialise manager
- _manager.worker = false;
- _manager.max_workers = 0;
+ _manager.worker = false;
+ _manager.max_workers = 0;
+ _manager.max_worker_requests = 0;
+ _manager.comm_timeout = 0;
+
_manager.workers_count = 0;
_manager.worker_id = 0;
_manager.socket = NULL;
@@ -40,13 +43,16 @@ namespace bitz {
// initialise listening socket
try {
- if ( address.empty() ) {
- _manager.socket = new socketlibrary::TCPServerSocket( port, backlog );
- } else {
- _manager.socket = new socketlibrary::TCPServerSocket( address, port, backlog );
- }
- } catch ( socketlibrary::SocketException &sex ) {
- throw ManagerException( "failed to initialise socket" );
+
+ // network socket address
+ psocksxx::nsockaddr naddr( address.c_str(), port );
+
+ _manager.socket = new psocksxx::tcpnsockstream();
+ _manager.socket->bind( &naddr, true );
+ _manager.socket->listen( backlog );
+
+ } catch ( psocksxx::sockexception &e ) {
+ throw ManagerException( std::string( "failed to initialise socket, " ).append( e.what() ) );
}
Logger &logger = Logger::instance();
@@ -71,10 +77,11 @@ namespace bitz {
}
- void Manager::spawn( unsigned int max_workers, unsigned int max_worker_requests ) throw( ManagerException ) {
+ void Manager::spawn( unsigned int max_workers, unsigned int max_worker_requests, unsigned int comm_timeout ) throw( ManagerException ) {
_manager.max_workers = max_workers;
_manager.max_worker_requests = max_worker_requests;
+ _manager.comm_timeout = comm_timeout;
_manager.worker_pool = new worker_pool_t[max_workers];
// pre-fork workers
@@ -114,7 +121,7 @@ namespace bitz {
_manager.worker_pool[worker_id].worker_id = worker_id;
_manager.worker_pool[worker_id].worker_pid = worker_pid;
- _manager.worker_pool[worker_id].worker->run( _manager.socket, _manager.max_worker_requests );
+ _manager.worker_pool[worker_id].worker->run( _manager.socket, _manager.max_worker_requests, _manager.comm_timeout );
logger.info( std::string( "end of cycle, worker[" ).append( util::itoa( worker_id ) ).append( "]" ) );
delete _manager.worker_pool[worker_id].worker;
@@ -200,6 +207,9 @@ namespace bitz {
void Manager::manager_workers() throw() {
+ // logger
+ Logger &logger = Logger::instance();
+
if (! _manager.worker ) {
// check the worker count
@@ -208,8 +218,12 @@ namespace bitz {
// we are missing workers, find out who
for (unsigned int i = 0; i < _manager.max_workers; i++ ) {
if ( _manager.worker_pool[i].worker_pid == 0 ) {
- // spawn a worker for the missing
- spawn_worker( i );
+ try {
+ // spawn a worker for the missing
+ spawn_worker( i );
+ } catch ( ManagerException &mex ) {
+ logger.warn( std::string( "[manager] failed to spawn worker[" ).append( util::itoa( i ) ).append( "], exception: ").append( mex.what() ) );
+ }
}
}
diff --git a/src/bitz/manager.h b/src/bitz/manager.h
index cc15f9c..ce49e9e 100644
--- a/src/bitz/manager.h
+++ b/src/bitz/manager.h
@@ -20,8 +20,8 @@
#ifndef BITZ_MANAGER_H
#define BITZ_MANAGER_H
-#include <unistd.h> // pid_t, fork() etc.
-#include <socket/socket.h> // socket-library
+#include <unistd.h>
+#include <psocksxx/tcpnsockstream.h>
#include "manager_exception.h"
#include "worker.h"
@@ -50,10 +50,11 @@ namespace bitz {
bool worker;
unsigned int max_workers;
unsigned int max_worker_requests;
+ unsigned int comm_timeout;
unsigned int workers_count;
unsigned int worker_id;
- socketlibrary::TCPServerSocket * socket;
+ psocksxx::tcpnsockstream * socket;
worker_pool_t * worker_pool;
};
@@ -61,14 +62,17 @@ namespace bitz {
/**
* Note: backlog = SOMAXCONN (from sys/socket.h)
*/
- Manager( unsigned short port, const std::string &address = "", int backlog = 128 ) throw( ManagerException );
+ Manager( unsigned short port, const std::string &address = "0.0.0.0", int backlog = 128 ) throw( ManagerException );
/**
* deconstructor
*/
virtual ~Manager();
- virtual void spawn( unsigned int max_workers = BITZ_MAX_WORKERS, unsigned int max_worker_requests = BITZ_MAX_WORKER_REQUESTS ) throw( ManagerException );
+ virtual void spawn( unsigned int max_workers = BITZ_MAX_WORKERS,
+ unsigned int max_worker_requests = BITZ_MAX_WORKER_REQUESTS,
+ unsigned int comm_timeout = 0 ) throw( ManagerException );
+
virtual void shutdown( bool graceful = true ) throw();
virtual void reap_worker( pid_t worker_pid ) throw();
virtual void manager_workers() throw();
diff --git a/src/bitz/options_request_handler.cpp b/src/bitz/options_request_handler.cpp
index 6872b5c..81def32 100644
--- a/src/bitz/options_request_handler.cpp
+++ b/src/bitz/options_request_handler.cpp
@@ -26,7 +26,7 @@ namespace bitz {
OptionsRequestHandler::~OptionsRequestHandler() { }
- icap::Response * OptionsRequestHandler::process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw() {
+ icap::Response * OptionsRequestHandler::process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw() {
icap::ResponseHeader * header;
icap::Response * response;
diff --git a/src/bitz/options_request_handler.h b/src/bitz/options_request_handler.h
index a81377f..136d111 100644
--- a/src/bitz/options_request_handler.h
+++ b/src/bitz/options_request_handler.h
@@ -30,7 +30,7 @@ namespace bitz {
OptionsRequestHandler();
virtual ~OptionsRequestHandler();
- icap::Response * process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw();
+ icap::Response * process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw();
/**
* Register a request handler so it is known to the OPTIONS
diff --git a/src/bitz/request_handler.cpp b/src/bitz/request_handler.cpp
index 3665cba..56f9bc0 100644
--- a/src/bitz/request_handler.cpp
+++ b/src/bitz/request_handler.cpp
@@ -63,7 +63,7 @@ namespace bitz {
}
- icap::Response * RequestHandler::process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw() {
+ icap::Response * RequestHandler::process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw() {
icap::Request * request;
@@ -238,7 +238,7 @@ namespace bitz {
}
- icap::Response * RequestHandler::process_preview( icap::Request * request, socketlibrary::TCPSocket * socket ) throw() {
+ icap::Response * RequestHandler::process_preview( icap::Request * request, psocksxx::iosockstream * socket ) throw() {
icap::Response * response = NULL;
Modifier * modifier;
@@ -365,7 +365,7 @@ namespace bitz {
}
- bool RequestHandler::preview_continue( icap::Response * response, icap::Request * request, socketlibrary::TCPSocket * socket ) throw() {
+ bool RequestHandler::preview_continue( icap::Response * response, icap::Request * request, psocksxx::iosockstream * socket ) throw() {
bool status = false;
diff --git a/src/bitz/request_handler.h b/src/bitz/request_handler.h
index 3fe99f1..2c31090 100644
--- a/src/bitz/request_handler.h
+++ b/src/bitz/request_handler.h
@@ -24,7 +24,7 @@
#include <icap/response.h>
#include <icap/request.h>
-#include <socket/socket.h>
+#include <psocksxx/iosockstream.h>
namespace bitz {
@@ -58,7 +58,7 @@ namespace bitz {
* @param socket socket object to read the data from
* @return response object
*/
- virtual icap::Response * process( icap::RequestHeader * req_header, socketlibrary::TCPSocket * socket ) throw();
+ virtual icap::Response * process( icap::RequestHeader * req_header, psocksxx::iosockstream * socket ) throw();
protected:
@@ -101,7 +101,7 @@ namespace bitz {
* @param socket socket object to read data from
* @return preview response (response object)
*/
- icap::Response * process_preview( icap::Request * request, socketlibrary::TCPSocket * socket ) throw();
+ icap::Response * process_preview( icap::Request * request, psocksxx::iosockstream * socket ) throw();
/**
* This method will use the loaded handler modules to get a response to the request.
@@ -119,7 +119,7 @@ namespace bitz {
* @param socket socket object to read / write data
* @return
*/
- bool preview_continue( icap::Response * response, icap::Request * request, socketlibrary::TCPSocket * socket ) throw();
+ bool preview_continue( icap::Response * response, icap::Request * request, psocksxx::iosockstream * socket ) throw();
private:
diff --git a/src/bitz/worker.cpp b/src/bitz/worker.cpp
index efbeef3..42fb3d0 100644
--- a/src/bitz/worker.cpp
+++ b/src/bitz/worker.cpp
@@ -51,65 +51,51 @@ namespace bitz {
}
- void Worker::run( socketlibrary::TCPServerSocket * server_sock, unsigned int max_requests ) throw() {
+ void Worker::run( psocksxx::tcpnsockstream * server_sock, unsigned int max_requests,
+ unsigned int comm_timeout ) throw() {
+ // logger
Logger &logger = Logger::instance();
- socketlibrary::TCPSocket * client_sock;
- icap::RequestHeader * req_header;
- icap::Response * response;
- RequestHandler * req_handler;
-
+ // client socket stream
+ psocksxx::nsockstream * client_sock;
- try {
+ while ( max_requests > 0 ) {
- while ( max_requests > 0 ) {
+ logger.debug( std::string( "[worker] waiting for a connection" ) );
- logger.debug( std::string( "[worker] waiting for a connection" ) );
+ try {
+ // accept a client connection
client_sock = server_sock->accept();
- logger.debug( std::string( "[worker] new connection accepted on " ).append( client_sock->getForeignAddress() )
- .append( ":" ).append( util::itoa( client_sock->getForeignPort() ) ) );
-
- // request header
- req_header = icap::util::read_req_header( client_sock );
- logger.debug( std::string( "[worker] request header:\r\n" ).append( req_header->raw_data() ) );
-
- // try to find a handler for the request
- req_handler = util::find_req_handler( _req_handlers, req_header->method() );
-
- if ( req_handler != NULL ) {
- logger.debug( std::string( "[worker] handling request: " ).append( req_header->method() ) );
-
- // process the request and grab the response
- response = req_handler->process( req_header, client_sock );
-
- } else {
-
- // unsupported request
- logger.info( std::string( "[worker] unsupported request: " ).append( req_header->method() ) );
- response = new icap::Response( new icap::ResponseHeader( icap::ResponseHeader::NOT_ALLOWED ) );
+ // FIXME: log accepted client details (e.g. address, port etc.)
+ logger.debug( std::string( "[worker] new connection accepted on [...]" ) );
+ // set timeout value
+ if ( comm_timeout > 0 ) {
+ client_sock->timeout( comm_timeout, 0 );
+ // TODO: add debug logging
}
- // send the response back to the client
- icap::util::send_response( response, client_sock );
+ } catch ( psocksxx::sockexception &e ) {
- // cleanup
- delete response;
- delete req_header;
-
- // destroy / close connection
- delete client_sock;
+ // failed to accept client connection
+ logger.error( std::string( "[worker] failed to accept connection: " ).append( e.what() ) );
+ // update request count
max_requests--;
+ continue;
}
- } catch( socketlibrary::SocketException &sex ) {
- logger.error( std::string( "[worker] ERROR: " ).append( sex.what() ) );
- }
+ // handle client request(s)
+ max_requests = serve_client( client_sock, max_requests );
+
+ // destroy / close connection
+ delete client_sock;
+
+ };
}
@@ -134,5 +120,76 @@ namespace bitz {
}
+
+ unsigned int Worker::serve_client( psocksxx::nsockstream * client_sock, unsigned int max_requests ) throw() {
+
+ // logger
+ Logger &logger = Logger::instance();
+
+ icap::RequestHeader * req_header = NULL;
+ icap::Response * response;
+ RequestHandler * req_handler;
+
+
+ do {
+
+ // cleanup request header
+ if ( req_header != NULL ) {
+ delete req_header;
+ }
+
+ // request header
+ req_header = icap::util::read_req_header( client_sock );
+ logger.debug( std::string( "[worker] request header:\r\n" ).append( req_header->raw_data() ) );
+
+ // check timeout
+ if ( client_sock->timedout() ) {
+ logger.warn( "[worker] communication timed out..." );
+ return --max_requests;
+ }
+
+ // try to find a handler for the request
+ req_handler = util::find_req_handler( _req_handlers, req_header->method() );
+
+ // sanity check
+ if ( req_handler != NULL ) {
+
+ logger.debug( std::string( "[worker] handling request: " ).append( req_header->method() ) );
+
+ // process the request and grab the response
+ response = req_handler->process( req_header, client_sock );
+
+ } else {
+
+ // unsupported request
+ logger.info( std::string( "[worker] unsupported request: " ).append( req_header->method() ) );
+ response = new icap::Response( new icap::ResponseHeader( icap::ResponseHeader::NOT_ALLOWED ) );
+
+ }
+
+ // FIXME: this should be configurable and should default to close
+ // connection if the client doesn't send a connection header
+ if ( req_header->value( "Connection" ) == "keep-alive" ) {
+ response->header()->attach( "Connection" , "keep-alive" );
+ }
+
+ // send the response back to the client
+ icap::util::send_response( response, client_sock );
+
+ // cleanup
+ delete response;
+
+ } while ( ( --max_requests > 0 ) && ( req_header->value( "Connection" ) == "keep-alive" ) );
+
+
+ // cleanup request header
+ if ( req_header != NULL ) {
+ delete req_header;
+ }
+
+ return max_requests;
+
+ }
+
} /* end of namespace bitz */
diff --git a/src/bitz/worker.h b/src/bitz/worker.h
index 69edbca..25b7bac 100644
--- a/src/bitz/worker.h
+++ b/src/bitz/worker.h
@@ -20,7 +20,7 @@
#ifndef BITZ_WORKER_H
#define BITZ_WORKER_H
-#include <socket/socket.h>
+#include <psocksxx/tcpnsockstream.h>
#include "common.h"
@@ -32,12 +32,14 @@ namespace bitz {
Worker();
virtual ~Worker();
- virtual void run( socketlibrary::TCPServerSocket * server_sock, unsigned int max_requests ) throw();
+ virtual void run( psocksxx::tcpnsockstream * server_sock,
+ unsigned int max_requests, unsigned int comm_timeout = 0 ) throw();
private:
req_handlers_t _req_handlers;
virtual void load_req_handlers() throw();
+ virtual unsigned int serve_client( psocksxx::nsockstream * client_sock, unsigned int max_requests ) throw();
};