From b80bd744ad873f6fc43018bc4bfb90677de167bd Mon Sep 17 00:00:00 2001 From: Stefan Wintermeyer Date: Mon, 17 Dec 2012 12:01:45 +0100 Subject: Start of GS5. --- misc/mon_ami/asterisk.py | 299 +++++++++++++++++++++++++++ misc/mon_ami/freeswitch.py | 194 ++++++++++++++++++ misc/mon_ami/helper.py | 21 ++ misc/mon_ami/log.py | 71 +++++++ misc/mon_ami/mon-ami | 58 ++++++ misc/mon_ami/mon_ami | 10 + misc/mon_ami/mon_ami_handler.py | 434 ++++++++++++++++++++++++++++++++++++++++ misc/mon_ami/mon_ami_main.py | 117 +++++++++++ misc/mon_ami/mon_ami_server.py | 85 ++++++++ misc/mon_ami/sqliter.py | 132 ++++++++++++ misc/mon_ami/tcp_server.py | 39 ++++ 11 files changed, 1460 insertions(+) create mode 100644 misc/mon_ami/asterisk.py create mode 100644 misc/mon_ami/freeswitch.py create mode 100644 misc/mon_ami/helper.py create mode 100644 misc/mon_ami/log.py create mode 100755 misc/mon_ami/mon-ami create mode 100755 misc/mon_ami/mon_ami create mode 100644 misc/mon_ami/mon_ami_handler.py create mode 100644 misc/mon_ami/mon_ami_main.py create mode 100644 misc/mon_ami/mon_ami_server.py create mode 100644 misc/mon_ami/sqliter.py create mode 100644 misc/mon_ami/tcp_server.py (limited to 'misc/mon_ami') diff --git a/misc/mon_ami/asterisk.py b/misc/mon_ami/asterisk.py new file mode 100644 index 0000000..ffcff06 --- /dev/null +++ b/misc/mon_ami/asterisk.py @@ -0,0 +1,299 @@ +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface Server +# Asterisk AMI client connector +# (c) AMOOMA GmbH 2012 + +from threading import Thread, Lock +from log import ldebug, linfo, lwarn, lerror, lcritic +from time import sleep +from traceback import format_exc +from helper import to_hash +import socket + +class AsteriskAMIServer(Thread): + + def __init__(self, client_socket, address, message_queue): + Thread.__init__(self) + self.runthread = True + self.LINE_SEPARATOR = "\r\n" + self.GREETING_STRING = 'Asterisk Call Manager/1.1' + self.ASTERISK_VERSION_STRING = 'Asterisk 1.6.2.9-2' + self.ASTERISK_CHANNEL_STATES = ( + 'Down', + 'Reserved', + 'Offhook', + 'Dialing', + 'Ring', + 'Ringing', + 'Up', + 'Busy', + 'Dialing_Offhook', + 'Pprering', + 'Mute', + ) + self.ASTERISK_PRESENTATION_INDICATOR = ( + 'Presentation allowed', + 'Presentation restricted', + 'Number not available due to interworking', + 'Reserved', + ) + self.ASTERISK_SCREENING_INDICATOR = ( + 'not screened', + 'verified and passed', + 'verified and failed', + 'Network provided', + ) + + self.write_lock = Lock() + self.socket = client_socket + self.address = address + self.message_queue = message_queue + + + def stop(self): + ldebug('thread stop', self) + self.runthread = False + + + def run(self): + ldebug('starting AMI server thread', self) + + data = '' + while self.runthread and self.socket: + try: + recv_buffer = self.socket.recv(128) + except socket.timeout as exception: + # Socket timeout occured + continue + except: + lerror(format_exc(), self) + self.runthread = False + break + + if not recv_buffer: + ldebug('client connection lost', self) + break + + data += recv_buffer + messages = data.split(self.LINE_SEPARATOR * 2) + data = messages.pop() + + for message_str in messages: + if not message_str: + continue + + message = to_hash(message_str.split(self.LINE_SEPARATOR)) + self.message_queue.appendleft({'type': 'ami_client_message', 'body': message}) + + ldebug('exiting AMI server thread', self) + + + def send(self, send_buffer): + try: + self.write_lock.acquire() + self.socket.send(send_buffer) + self.write_lock.release() + return True + except: + return False + + + def send_message(self, *message): + if len(message) == 1 and type(message[0]) == list: + self.send(self.LINE_SEPARATOR.join(message[0]) + (self.LINE_SEPARATOR * 2)) + else: + self.send(self.LINE_SEPARATOR.join(message) + (self.LINE_SEPARATOR * 2)) + + def send_greeting(self): + self.send_message(self.GREETING_STRING) + + def send_message_unknown(self, command): + self.send_message('Response: Error', 'Message: Invalid/unknown command: %s.' % command) + + def send_login_ack(self): + self.send_message('Response: Success', 'Message: Authentication accepted') + + def send_login_nack(self): + self.send_message('Response: Error', 'Message: Authentication failed') + + def send_logout_ack(self): + self.send_message('Response: Goodbye', 'Message: Thank you for flying MonAMI') + + def send_pong(self, action_id): + self.send_message('Response: Pong', "ActionID: %s" % str(action_id), 'Server: localhost') + + def send_asterisk_version(self, action_id): + self.send_message( + 'Response: Follows', + 'Privilege: Command', + "ActionID: %s" % str(action_id), + self.ASTERISK_VERSION_STRING, + '--END COMMAND--' + ) + + def send_hangup_ack(self): + self.send_message('Response: Success', 'Message: Channel Hungup') + + + def send_originate_ack(self, action_id): + self.send_message('Response: Success', "ActionID: %s" % str(action_id), 'Message: Originate successfully queued') + + + def send_status_ack(self, action_id): + self.send_message( + 'Response: Success', + "ActionID: %s" % str(action_id), + 'Message: Channel status will follow' + ) + self.send_message( + 'Event: StatusComplete', + "ActionID: %s" % action_id, + 'Items: 0' + ) + + def send_extension_state(self, action_id, extension, context = 'default', status = -1, hint = ''): + self.send_message( + 'Response: Success', + "ActionID: %s" % str(action_id), + 'Message: Extension Status', + 'Exten: %s' % extension, + 'Context: %s' % context, + 'Hint: %s' % hint, + 'Status: %d' % status, + ) + + + def send_event_newchannel(self, uuid, channel_name, channel_state, caller_id_number = '', caller_id_name = '', destination_number = ''): + self.send_message( + 'Event: Newchannel', + 'Privilege: call,all', + 'Channel: %s' % str(channel_name), + 'ChannelState: %d' % channel_state, + 'ChannelStateDesc: %s' % self.ASTERISK_CHANNEL_STATES[channel_state], + 'CallerIDNum: %s' % str(caller_id_number), + 'CallerIDName: %s' % str(caller_id_name), + 'AccountCode:', + 'Exten: %s' % str(destination_number), + 'Context: default', + 'Uniqueid: %s' % str(uuid), + ) + + + def send_event_newstate(self, uuid, channel_name, channel_state, caller_id_number = '', caller_id_name = ''): + self.send_message( + 'Event: Newstate', + 'Privilege: call,all', + 'Channel: %s' % str(channel_name), + 'ChannelState: %d' % channel_state, + 'ChannelStateDesc: %s' % self.ASTERISK_CHANNEL_STATES[channel_state], + 'CallerIDNum: %s' % str(caller_id_number), + 'CallerIDName: %s' % str(caller_id_name), + 'Uniqueid: %s' % str(uuid), + ) + + + def send_event_newcallerid(self, uuid, channel_name, caller_id_number = '', caller_id_name = '', calling_pres = 0): + + presentation = self.ASTERISK_PRESENTATION_INDICATOR[calling_pres >> 6] + screening = self.ASTERISK_SCREENING_INDICATOR[calling_pres & 3] + + self.send_message( + 'Event: NewCallerid', + 'Privilege: call,all', + 'Channel: %s' % str(channel_name), + 'CallerIDNum: %s' % str(caller_id_number), + 'CallerIDName: %s' % str(caller_id_name), + 'Uniqueid: %s' % str(uuid), + 'CID-CallingPres: %d (%s, %s)' % (calling_pres, presentation, screening), + ) + + + def send_event_hangup(self, uuid, channel_name, caller_id_number = '', caller_id_name = '', cause = 0): + self.send_message( + 'Event: Hangup', + 'Privilege: call,all', + 'Channel: %s' % str(channel_name), + 'CallerIDNum: %s' % str(caller_id_number), + 'CallerIDName: %s' % str(caller_id_name), + 'Cause: %d' % cause, + 'Cause-txt: Unknown', + 'Uniqueid: %s' % str(uuid) + ) + + + def send_event_dial_begin(self, uuid, channel_name, caller_id_number, caller_id_name, destination_channel, destination_uuid, destination_number): + self.send_message( + 'Event: Dial', + 'Privilege: call,all', + 'SubEvent: Begin', + "Channel: %s" % str(channel_name), + "Destination: %s" % str(destination_channel), + 'CallerIDNum: %s' % str(caller_id_number), + 'CallerIDName: %s' % str(caller_id_name), + 'Uniqueid: %s' % str(uuid), + 'DestUniqueid: %s' % str(destination_uuid), + 'Dialstring: %s@default' % str(destination_number) + ) + + + def send_event_dial_end(self, uuid, channel_name, dial_status = 'UNKNOWN'): + self.send_message( + 'Event: Dial', + 'Privilege: call,all', + 'SubEvent: End', + "Channel: %s" % str(channel_name), + 'Uniqueid: %s' % str(uuid), + "DialStatus: %s" % str(dial_status), + ) + + + def send_event_originate_response(self, uuid, channel_name, caller_id_number, caller_id_name, destination_number, action_id, reason): + #reasons: + #0: no such extension or number + #1: no answer + #4: answered + #8: congested or not available + + if reason == 4: + response = 'Success' + else: + response = 'Failure' + + self.send_message( + 'Event: OriginateResponse', + 'Privilege: call,all', + 'ActionID: %s' % str(action_id), + 'Response: %s' % response, + 'Channel: %s' % str(channel_name), + 'Context: default', + 'Exten: %s' % str(destination_number), + 'Reason: %d' % reason, + 'CallerIDNum: %s' % str(caller_id_number), + 'CallerIDName: %s' % str(caller_id_name), + 'Uniqueid: %s' % str(uuid), + ) + + + def send_event_bridge(self, uuid, channel_name, caller_id, o_uuid, o_channel_name, o_caller_id): + self.send_message( + 'Event: Bridge', + 'Privilege: call,all', + 'Bridgestate: Link', + 'Bridgetype: core', + 'Channel1: %s' % str(channel_name), + 'Channel2: %s' % str(o_channel_name), + 'Uniqueid1: %s' % str(uuid), + 'Uniqueid2: %s' % str(o_uuid), + 'CallerID1: %s' % str(caller_id), + 'CallerID2: %s' % str(o_caller_id), + ) + + def send_event_newaccountcode(self, uuid, channel_name): + self.send_message( + 'Event: NewAccountCode', + 'Privilege: call,all', + "Channel: %s" % str(channel_name), + 'Uniqueid: %s' % str(uuid), + 'AccountCode:', + 'OldAccountCode:', + ) diff --git a/misc/mon_ami/freeswitch.py b/misc/mon_ami/freeswitch.py new file mode 100644 index 0000000..eab9bb6 --- /dev/null +++ b/misc/mon_ami/freeswitch.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface server +# FreeSWITCH event socket interface +# (c) AMOOMA GmbH 2012 + +from threading import Thread, Lock +from log import ldebug, linfo, lwarn, lerror, lcritic +from collections import deque +from time import sleep, time +from random import random +from helper import to_hash +from traceback import format_exc +import socket +import sys +import hashlib + + +class FreeswitchEventSocket(Thread): + + def __init__(self, host, port, password): + Thread.__init__(self) + self.LINE_SEPARATOR = "\n" + self.SOCKET_TIMEOUT = 1 + self.MESSAGE_PIPE_MAX_LENGTH = 128 + self.write_lock = Lock() + self.host = host + self.port = port + self.password = password + self.runthread = True + self.fs = None + self.client_queues = {} + + + def stop(self): + ldebug('thread stop', self) + self.runthread = False + + + def run(self): + ldebug('starting FreeSWITCH event_socket thread', self) + + while self.runthread: + if not self.connect(): + ldebug('could not connect to FreeSWITCH - retry', self) + sleep(self.SOCKET_TIMEOUT) + continue + ldebug('opening event_socket connection', self) + + data = '' + while self.runthread and self.fs: + + try: + recv_buffer = self.fs.recv(128) + except socket.timeout as exception: + # Socket timeout occured + continue + except: + lerror(format_exc(), self) + self.runthread = False + break + + if not recv_buffer: + ldebug('event_socket connection lost', self) + break + + data += recv_buffer + messages = data.split(self.LINE_SEPARATOR * 2) + data = messages.pop() + + for message_str in messages: + if not message_str: + continue + message_body = None + + message = to_hash(message_str.split(self.LINE_SEPARATOR)) + + if not 'Content-Type' in message: + ldebug('message without Content-Type', self) + continue + + if 'Content-Length' in message and int(message['Content-Length']) > 0: + content_length = int(message['Content-Length']) + while len(data) < int(message['Content-Length']): + try: + data += self.fs.recv(content_length - len(data)) + except socket.timeout as exception: + ldebug('Socket timeout in message body', self) + continue + except: + lerror(format_exc(), self) + break + message_body = data.strip() + data = '' + else: + content_length = 0 + + self.process_message(message['Content-Type'], message, content_length, message_body) + + + ldebug('closing event_socket connection', self) + if self.fs: + self.fs.close() + + + def connect(self): + fs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + fs.connect((self.host, self.port)) + except: + lerror(format_exc(), self) + return False + + fs.settimeout(self.SOCKET_TIMEOUT) + self.fs = fs + return True + + + def authenticate(self): + ldebug('send authentication to FreeSWITCH', self) + self.send_message("auth %s" % self.password) + + + def send(self, send_buffer): + try: + self.write_lock.acquire() + self.fs.send(send_buffer) + self.write_lock.release() + return True + except: + return False + + + def send_message(self, *message): + if len(message) == 1 and type(message[0]) == list: + self.send(self.LINE_SEPARATOR.join(message[0]) + (self.LINE_SEPARATOR * 2)) + else: + self.send(self.LINE_SEPARATOR.join(message) + (self.LINE_SEPARATOR * 2)) + + + def process_message(self, content_type, message_head, content_length, message_body): + + if content_type == 'auth/request': + self.authenticate() + if content_type == 'command/reply': + if 'Reply-Text' in message_head: + ldebug('FreeSWITCH command reply: %s' % message_head['Reply-Text'], self) + elif content_type == 'text/event-plain': + event = to_hash(message_body.split(self.LINE_SEPARATOR)) + + if 'Event-Name' in event and event['Event-Name'] in self.client_queues: + event_type = event['Event-Name'] + for entry_id, message_pipe in self.client_queues[event_type].items(): + if type(message_pipe) == deque: + if len(message_pipe) < self.MESSAGE_PIPE_MAX_LENGTH: + message_pipe.appendleft({'type': 'freeswitch_event', 'body': event}) + else: + lwarn("event queue %d full" % entry_id) + else: + ldebug("force-deregister event queue %d for event type %s" % (entry_id, event_type), self) + del self.client_queues[event_type][entry_id] + + def register_client_queue(self, queue, event_type): + if not event_type in self.client_queues: + self.client_queues[event_type] = {} + self.send_message("event plain all %s" % event_type) + ldebug("we are listening now to events of type: %s" % event_type, self) + self.client_queues[event_type][id(queue)] = queue + ldebug("event queue %d registered for event type: %s" % (id(queue), event_type), self) + + + def deregister_client_queue(self, queue, event_type): + ldebug("deregister event queue %d for event type %s" % (id(queue), event_type), self) + del self.client_queues[event_type][id(queue)] + + def deregister_client_queue_all(self, queue): + for event_type, event_queues in self.client_queues.items(): + if id(queue) in event_queues: + ldebug("deregister event queue %d for all registered event types - event type %s" % (id(queue), event_type), self) + del self.client_queues[event_type][id(queue)] + + + def hangup(self, uuid, hangup_cause = 'NORMAL_CLEARING'): + ldebug('hangup channel: %s' % uuid, self) + self.send_message('SendMsg %s' % uuid, 'call-command: hangup', 'hangup-cause: %s' % hangup_cause) + + return True + + + def originate(self, sip_account, extension, action_id = ''): + uuid = hashlib.md5('%s%f' % (sip_account, random() * 65534)).hexdigest() + ldebug('originate call - from: %s, to: %s, uuid: %s' % (sip_account, extension, uuid), self) + self.send_message('bgapi originate {origination_uuid=%s,origination_action=%s,origination_caller_id_number=%s}user/%s %s' % (uuid, action_id, sip_account, sip_account, extension)) + + return uuid diff --git a/misc/mon_ami/helper.py b/misc/mon_ami/helper.py new file mode 100644 index 0000000..bf286de --- /dev/null +++ b/misc/mon_ami/helper.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface server +# helper functions +# (c) AMOOMA GmbH 2012 + + +def to_hash(message): + message_hash = {} + for line in message: + keyword, delimeter, value = line.partition(": ") + if (keyword): + message_hash[keyword] = value.strip() + + return message_hash + + +def sval(array, key): + try: + return array[key] + except: + return None diff --git a/misc/mon_ami/log.py b/misc/mon_ami/log.py new file mode 100644 index 0000000..92709ad --- /dev/null +++ b/misc/mon_ami/log.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Log library +# (c) AMOOMA GmbH 2012 + +import logging + +def ldebug(entry, initiator = None): + global logger + logger.debug('%s(%d) %s' % (type(initiator).__name__, id(initiator), entry)) + +def lwarn(entry, initiator = None): + global logger + logger.warning('%s(%d) %s' % (type(initiator).__name__, id(initiator), entry)) + +def lerror(entry, initiator = None): + global logger + logger.error('%s(%d) %s' % (type(initiator).__name__, id(initiator), entry)) + +def linfo(entry, initiator = None): + global logger + logger.info('%s(%d) %s' % (type(initiator).__name__, id(initiator), entry)) + +def lcritic(entry, initiator = None): + global logger + logger.critical('%s(%d) %s' % (type(initiator).__name__, id(initiator), entry)) + +def setup_log(file_name = None, loglevel = 5, logformat = None): + from sys import stdout + global logger + + if file_name: + try: + logfile = logging.FileHandler(file_name) + except: + logfile = logging.StreamHandler(stdout) + else: logfile = logging.StreamHandler(stdout) + + loglevel = int(loglevel) + + if (loglevel == 0): + logfile.setLevel(logging.NOTSET) + logger.setLevel(logging.NOTSET) + elif (loglevel == 1): + logfile.setLevel(logging.CRITICAL) + logger.setLevel(logging.CRITICAL) + elif (loglevel == 2): + logfile.setLevel(logging.ERROR) + logger.setLevel(logging.ERROR) + elif (loglevel == 3): + logfile.setLevel(logging.WARNING) + logger.setLevel(logging.WARNING) + elif (loglevel == 4): + logfile.setLevel(logging.INFO) + logger.setLevel(logging.INFO) + elif (loglevel >= 5): + logfile.setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + + if not logformat: + logformat = '%(asctime)s-%(name)s-%(levelname)s-%(message)s' + + try: + format = logging.Formatter(logformat) + logfile.setFormatter(format) + except: + format = logging.Formatter('%(asctime)s-%(name)s-%(levelname)s-%(message)s') + logfile.setFormatter(format) + + logger.addHandler(logfile) + +logger = logging.getLogger('#') diff --git a/misc/mon_ami/mon-ami b/misc/mon_ami/mon-ami new file mode 100755 index 0000000..a630140 --- /dev/null +++ b/misc/mon_ami/mon-ami @@ -0,0 +1,58 @@ +#!/bin/sh + +##################################################################### +# MonAMI Asterisk Manger Interface Emulator +# Start Script +# (c) AMOOMA GmbH 2012 +##################################################################### + +### BEGIN INIT INFO +# Provides: mon_ami +# Required-Start: freeswitch +# Required-Stop: freeswitch +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: MonAMI Asterisk Manger Interface Emulator +# Description: +# +### END INIT INFO + +DAEMON=/opt/GS5/misc/mon_ami/mon_ami +EXECUTABLE=`basename 'mon_ami'` +DESC="MonAMI Asterisk Manger Interface Emulator" +ARGS="--log-file=/var/log/mon_ami.log" + +if ! [ -x $DAEMON ] ; then + echo "ERROR: $DAEMON not found" + exit 1 +fi + +case "$1" in + start) + echo -n "Starting $DESC: " + start-stop-daemon --start --pidfile /var/run/$EXECUTABLE.pid \ + --make-pidfile --background --startas $DAEMON -- $ARGS + echo "$EXECUTABLE." + ;; + + stop) + echo -n "Stopping $DESC: " + start-stop-daemon --stop --quiet --oknodo --retry=TERM/30/KILL/5 \ + --pidfile /var/run/$EXECUTABLE.pid + rm -f /var/run/$NAME.pid + echo "$EXECUTABLE." + ;; + + reload|restart|force-reload) + $0 stop + sleep 2 + $0 start + ;; + + *) + echo "Usage: $0 {start|stop|restart|reload|force-reload}" >&2 + exit 1 + ;; +esac + +exit 0 diff --git a/misc/mon_ami/mon_ami b/misc/mon_ami/mon_ami new file mode 100755 index 0000000..a212cfe --- /dev/null +++ b/misc/mon_ami/mon_ami @@ -0,0 +1,10 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface Server +# (c) AMOOMA GmbH 2012 +from mon_ami_main import main +from sys import exit + +if (__name__ == "__main__"): + result = main() + exit(result) diff --git a/misc/mon_ami/mon_ami_handler.py b/misc/mon_ami/mon_ami_handler.py new file mode 100644 index 0000000..59e9225 --- /dev/null +++ b/misc/mon_ami/mon_ami_handler.py @@ -0,0 +1,434 @@ +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface Server +# Asterisk AMI Emulator Handler Process +# (c) AMOOMA GmbH 2012 + +from threading import Thread +from log import ldebug, linfo, lwarn, lerror, lcritic +from time import sleep +from traceback import format_exc +from collections import deque +from urllib import unquote +from asterisk import AsteriskAMIServer +from socket import SHUT_RDWR +from helper import sval + + +class MonAMIHandler(Thread): + + def __init__(self, socket, address, event_socket=None): + Thread.__init__(self) + self.runthread = True + self.socket = socket + self.address = address + self.event_socket = event_socket + self.ami = None + self.deregister_at_server = None + self.message_pipe = deque() + self.channels = {} + self.user_password_authentication = None + self.account_name = '' + + + def stop(self): + ldebug('thread stop', self) + self.ami.stop() + self.runthread = False + + + def shutdown(self): + self.deregister_at_server(self) + ldebug('closing connection to %s:%d' % self.address) + try: + self.socket.shutdown(SHUT_RDWR) + self.socket.close() + ldebug('connection closed ', self) + except: + ldebug('connection closed by foreign host', self) + + def run(self): + ldebug('starting MonAMI handler thread', self) + + # starting asterisk AMI thread + self.ami = AsteriskAMIServer(self.socket, self.address, self.message_pipe) + self.ami.start() + self.ami.send_greeting() + + # register for events + self.event_socket.register_client_queue(self.message_pipe, 'CHANNEL_CREATE') + self.event_socket.register_client_queue(self.message_pipe, 'CHANNEL_DESTROY') + self.event_socket.register_client_queue(self.message_pipe, 'CHANNEL_STATE') + self.event_socket.register_client_queue(self.message_pipe, 'CHANNEL_ANSWER') + self.event_socket.register_client_queue(self.message_pipe, 'CHANNEL_BRIDGE') + + while self.runthread and self.ami.isAlive(): + if self.message_pipe: + message = self.message_pipe.pop() + message_type = sval(message, 'type') + if message_type == 'freeswitch_event': + self.handle_fs_event(message['body']) + elif message_type == 'ami_client_message': + self.handle_ami_client_message(message['body']) + else: + sleep(0.1) + + self.event_socket.deregister_client_queue_all(self.message_pipe) + + ldebug('exiting MonAMI handler thread', self) + self.shutdown() + + + def handle_ami_client_message(self, message): + + if 'Action' in message: + action = message['Action'].lower() + + if action == 'login': + if 'UserName' in message: + self.account_name = message['UserName'] + if 'Secret' in message and self.user_password_authentication and self.user_password_authentication(self.account_name, message['Secret']): + self.ami.send_login_ack() + ldebug('AMI connection authenticated - account: %s' % self.account_name, self) + else: + self.ami.send_login_nack() + linfo('AMI authentication failed - account: %s' % sval(message, 'UserName'), self) + self.ami.stop() + self.stop() + elif action == 'logoff': + self.ami.send_logout_ack() + ldebug('AMI logout', self) + self.ami.stop() + self.stop() + elif action == 'ping': + self.ami.send_pong(sval(message, 'ActionID')) + elif action == 'status': + self.ami.send_status_ack(sval(message, 'ActionID')) + elif action == 'command' and sval(message, 'Command') == 'core show version': + self.ami.send_asterisk_version(sval(message, 'ActionID')) + elif action == 'hangup': + account_name, separator, uuid = str(sval(message, 'Channel')).rpartition('-uuid-') + if account_name != '': + self.event_socket.hangup(uuid) + self.ami.send_hangup_ack() + elif action == 'originate': + self.message_originate(message) + elif action == 'extensionstate': + self.ami.send_extension_state(sval(message, 'ActionID'), sval(message, 'Exten'), sval(message, 'Context')) + else: + ldebug('unknown asterisk message received: %s' % message, self) + self.ami.send_message_unknown(message['Action']) + + + def to_unique_channel_name(self, uuid, channel_name): + + # strip anything left of sip_account_name + path, separator, contact_part = channel_name.rpartition('/sip:') + if path == '': + path, separator, contact_part = channel_name.rpartition('/') + + # if failed return name unchanged + if path == '': + return channel_name + + + # strip domain part + account_name = contact_part.partition('@')[0] + + # if failed return name unchanged + if account_name == '': + return channel_name + + # create unique channel name + return 'SIP/%s-uuid-%s' % (account_name, uuid) + + def message_originate(self, message): + destination_number = str(sval(message, 'Exten')) + action_id = sval(message, 'ActionID') + self.ami.send_originate_ack(action_id) + uuid = self.event_socket.originate(self.account_name, destination_number, action_id) + + + def handle_fs_event(self, event): + event_type = event['Event-Name'] + #ldebug('event type received: %s' % event_type, self) + + event_types = { + 'CHANNEL_CREATE': self.event_channel_create, + 'CHANNEL_DESTROY': self.event_channel_destroy, + 'CHANNEL_STATE': self.event_channel_state, + 'CHANNEL_ANSWER': self.event_channel_answer, + 'CHANNEL_BRIDGE': self.event_channel_bridge, + } + + uuid = event_types[event_type](event) + + if not uuid: + return False + + channel = sval(self.channels, uuid); + + if not channel: + return False + + o_uuid = channel['o_uuid'] + o_channel = sval(self.channels, o_uuid); + + if sval(channel, 'origination_action') or sval(o_channel, 'origination_action'): + if not sval(channel, 'ami_start') and not sval(o_channel, 'ami_start'): + if sval(channel, 'owned') and sval(channel, 'origination_action'): + ldebug('sending AMI events for origitate call start (on this channel): %s' % uuid, self) + self.ami_send_originate_start(channel) + self.channels[uuid]['ami_start'] = True + elif sval(o_channel, 'owned') and sval(o_channel, 'origination_action'): + ldebug('sending AMI events for origitate call start (on other channel): %s' % uuid, self) + self.ami_send_originate_start(o_channel) + self.channels[o_uuid]['ami_start'] = True + elif o_channel: + if sval(channel, 'owned') and sval(channel, 'origination_action'): + ldebug('sending AMI events for origitate call progress (on this channel): %s' % uuid, self) + self.ami_send_originate_outbound(channel) + self.channels[uuid]['origination_action'] = False + elif sval(o_channel, 'owned') and sval(o_channel, 'origination_action'): + ldebug('sending AMI events for origitate call progress (on other channel): %s' % uuid, self) + self.ami_send_originate_outbound(o_channel) + self.channels[o_uuid]['origination_action'] = False + elif o_channel: + if not sval(channel, 'ami_start') and not sval(o_channel, 'ami_start'): + if sval(channel, 'owned') and sval(channel, 'direction') == 'inbound': + ldebug('sending AMI events for outbound call start (on this channel): %s' % uuid, self) + self.ami_send_outbound_start(channel) + self.channels[uuid]['ami_start'] = True + elif sval(o_channel, 'owned') and sval(channel, 'direction') == 'outbound': + ldebug('sending AMI events for outbound call start (on other channel): %s' % uuid, self) + self.ami_send_outbound_start(o_channel) + self.channels[o_uuid]['ami_start'] = True + + if not sval(channel, 'ami_start')and not sval(o_channel, 'ami_start'): + if sval(channel, 'owned') and sval(channel, 'direction') == 'outbound': + ldebug('sending AMI events for inbound call start (on this channel): %s' % uuid, self) + self.ami_send_inbound_start(channel) + self.channels[uuid]['ami_start'] = True + elif sval(o_channel, 'owned') and sval(channel, 'direction') == 'inbound': + ldebug('sending AMI events for inbound call start (on other channel): %s' % uuid, self) + self.ami_send_inbound_start(o_channel) + self.channels[o_uuid]['ami_start'] = True + + + def event_channel_create(self, event): + uuid = sval(event, 'Unique-ID') + o_uuid = sval(event, 'Other-Leg-Unique-ID') + + if uuid in self.channels: + ldebug('channel already listed: %s' % uuid, self) + return false + + channel_name = self.to_unique_channel_name(uuid, unquote(str(sval(event, 'Channel-Name')))) + o_channel_name = self.to_unique_channel_name(o_uuid, unquote(str(sval(event, 'Other-Leg-Channel-Name')))) + + if self.account_name in channel_name: + channel_owned = True + else: + channel_owned = False + + if self.account_name in o_channel_name: + channel_related = True + else: + channel_related = False + + if not channel_owned and not channel_related: + ldebug('channel neither owned nor reladed to account: %s' % uuid, self) + return False + + channel = { + 'uuid': uuid, + 'name': channel_name, + 'direction': sval(event, 'Call-Direction'), + 'channel_state': sval(event, 'Channel-State'), + 'call_state': sval(event, 'Channel-Call-State'), + 'answer_state': sval(event, 'Answer-State'), + 'owned': channel_owned, + 'related': channel_related, + 'caller_id_name': unquote(str(sval(event, 'Caller-Caller-ID-Name'))), + 'caller_id_number': unquote(str(sval(event, 'Caller-Caller-ID-Number'))), + 'callee_id_name': unquote(str(sval(event, 'Caller-Callee-ID-Name'))), + 'callee_id_number': unquote(str(sval(event, 'Caller-Callee-ID-Number'))), + 'destination_number': str(sval(event, 'Caller-Destination-Number')), + 'origination_action': sval(event, 'variable_origination_action'), + 'o_uuid': o_uuid, + 'o_name': o_channel_name, + } + + if channel['answer_state'] == 'ringing': + if channel['direction'] == 'inbound': + asterisk_channel_state = 4 + else: + asterisk_channel_state = 5 + else: + asterisk_channel_state = 0 + + if not o_uuid: + ldebug('one legged call, channel: %s' % uuid, self) + elif o_uuid not in self.channels: + o_channel = { + 'uuid': o_uuid, + 'name': o_channel_name, + 'direction': sval(event, 'Other-Leg-Direction'), + 'channel_state': sval(event, 'Channel-State'), + 'call_state': sval(event, 'Channel-Call-State'), + 'answer_state': sval(event, 'Answer-State'), + 'owned': channel_related, + 'related': channel_owned, + 'caller_id_name': unquote(str(sval(event, 'Caller-Caller-ID-Name'))), + 'caller_id_number': unquote(str(sval(event, 'Caller-Caller-ID-Number'))), + 'callee_id_name': unquote(str(sval(event, 'Caller-Callee-ID-Name'))), + 'callee_id_number': unquote(str(sval(event, 'Caller-Callee-ID-Number'))), + 'destination_number': str(sval(event, 'Other-Leg-Destination-Number')), + 'o_uuid': uuid, + 'o_name': channel_name, + } + + if o_channel['answer_state'] == 'ringing': + if o_channel['direction'] == 'inbound': + asterisk_o_channel_state = 4 + else: + asterisk_o_channel_state = 5 + else: + asterisk_o_channel_state = 0 + + ldebug('create channel list entry for related channel: %s, name: %s' % (o_uuid, o_channel_name), self) + self.channels[o_uuid] = o_channel + else: + ldebug('updating channel: %s, name: %s, o_uuid: %s, o_name %s' % (o_uuid, o_channel_name, uuid, channel_name), self) + self.channels[o_uuid]['o_uuid'] = uuid + self.channels[o_uuid]['o_name'] = channel_name + o_channel = self.channels[o_uuid] + + if channel_owned: + ldebug('create channel list entry for own channel: %s, name: %s' % (uuid, channel_name), self) + elif channel_related: + ldebug('create channel list entry for related channel: %s, name: %s' % (uuid, channel_name), self) + + self.channels[uuid] = channel + + return uuid + + + def event_channel_destroy(self, event): + uuid = sval(event, 'Unique-ID') + hangup_cause_code = int(sval(event, 'variable_hangup_cause_q850')) + channel = sval(self.channels, uuid) + + if channel: + channel['hangup_cause_code'] = hangup_cause_code + if sval(channel, 'ami_start'): + self.ami_send_outbound_end(channel) + del self.channels[uuid] + ldebug('channel removed from list: %s, cause %d' % (uuid, hangup_cause_code), self) + + return uuid + + + def event_channel_state(self, event): + uuid = sval(event, 'Unique-ID') + channel_state = sval(event, 'Channel-State') + call_state = sval(event, 'Channel-Call-State') + answer_state = sval(event, 'Answer-State') + + if sval(self.channels, uuid) and False: + ldebug('updating channel state - channel: %s, channel_state: %s, call_state %s, answer_state: %s' % (uuid, channel_state, call_state, answer_state), self) + self.channels[uuid]['channel_state'] = channel_state + self.channels[uuid]['call_state'] = call_state + self.channels[uuid]['answer_state'] = answer_state + + return uuid + + + def event_channel_answer(self, event): + uuid = sval(event, 'Unique-ID') + o_uuid = sval(event, 'Other-Leg-Unique-ID') + channel = sval(self.channels, uuid) + if not o_uuid: + o_uuid = sval(channel, 'o_uuid') + o_channel = sval(self.channels, o_uuid) + origination_action = sval(channel, 'origination_action') + + if channel: + channel_state = sval(event, 'Channel-State') + call_state = sval(event, 'Channel-Call-State') + answer_state = sval(event, 'Answer-State') + ldebug('channel answered - channel: %s, owned: %s, channel_state: %s, call_state %s, answer_state: %s, other leg: %s' % (uuid, sval(channel, 'owned'), channel_state, call_state, answer_state, o_uuid), self) + self.ami.send_event_newstate(uuid, sval(channel, 'name'), 6, sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name')) + + self.channels[uuid]['channel_state'] = channel_state + self.channels[uuid]['call_state'] = call_state + self.channels[uuid]['answer_state'] = answer_state + + if sval(channel, 'origination_action'): + if sval(channel, 'owned'): + ldebug('sending AMI originate response - success: %s' % uuid, self) + self.ami.send_event_originate_response(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), '101', sval(channel, 'origination_action'), 4) + elif not o_uuid: + ldebug('sending AMI events for outbound call start on one legged call (this channel): %s' % uuid, self) + self.ami_send_outbound_start(channel) + self.ami.send_event_bridge(uuid, sval(channel, 'name'), sval(channel, 'caller_id_number'), o_uuid, sval(o_channel, 'name'), sval(o_channel, 'caller_id_number')) + + self.channels[uuid]['ami_start'] = True + + return uuid + + return False + + + def event_channel_bridge(self, event): + uuid = sval(event, 'Unique-ID') + o_uuid = sval(event, 'Other-Leg-Unique-ID') + + ldebug('bridge channel: %s to %s' % (uuid, o_uuid), self) + channel = sval(self.channels, uuid) + o_channel = sval(self.channels, o_uuid) + + if sval(channel, 'owned') or sval(o_channel, 'owned'): + ldebug('sending AMI bridge response: %s -> %s' % (uuid, o_uuid), self) + self.ami.send_event_bridge(uuid, sval(channel, 'name'), sval(channel, 'caller_id_number'), o_uuid, sval(o_channel, 'name'), sval(o_channel, 'caller_id_number')) + + + def ami_send_outbound_start(self, channel): + self.ami.send_event_newchannel(sval(channel, 'uuid'), sval(channel, 'name'), 0, sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), sval(channel, 'destination_number')) + self.ami.send_event_newstate(sval(channel, 'uuid'), sval(channel, 'name'), 4, sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name')) + self.ami.send_event_newchannel(sval(channel, 'o_uuid'), sval(channel, 'o_name'), 0, '', '', '') + self.ami.send_event_dial_begin(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), sval(channel, 'o_name'), sval(channel, 'o_uuid'), sval(channel, 'destination_number')) + self.ami.send_event_newcallerid(sval(channel, 'o_uuid'), sval(channel, 'o_name'), sval(channel, 'destination_number'), '', 0) + self.ami.send_event_newstate(sval(channel, 'o_uuid'), sval(channel, 'o_name'), 5, sval(channel, 'destination_number'), '') + + + def ami_send_outbound_end(self, channel): + self.ami.send_event_hangup(sval(channel, 'o_uuid'), sval(channel, 'o_name'), sval(channel, 'destination_number'), '', sval(channel, 'hangup_cause_code')) + self.ami.send_event_dial_end(sval(channel, 'uuid'), sval(channel, 'name')) + self.ami.send_event_hangup(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), sval(channel, 'hangup_cause_code')) + + if sval(channel, 'origination_action'): + self.ami.send_event_originate_response(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), sval(channel, 'destination_number'), sval(channel, 'origination_action'), 1) + + + def ami_send_inbound_start(self, channel): + self.ami.send_event_newchannel(sval(channel, 'o_uuid'), sval(channel, 'o_name'), 0, sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), sval(channel, 'callee_id_number')) + self.ami.send_event_newstate(sval(channel, 'o_uuid'), sval(channel, 'o_name'), 4, sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name')) + self.ami.send_event_newchannel(sval(channel, 'uuid'), sval(channel, 'name'), 0, '', '', '') + self.ami.send_event_dial_begin(sval(channel, 'o_uuid'), sval(channel, 'o_name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), sval(channel, 'name'), sval(channel, 'uuid'), sval(channel, 'destination_number')) + self.ami.send_event_newstate(sval(channel, 'uuid'), sval(channel, 'name'), 5, sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name')) + self.ami.send_event_newcallerid(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'destination_number'), '', 0) + + + def ami_send_originate_start(self, channel): + self.ami.send_event_newchannel(sval(channel, 'uuid'), sval(channel, 'name'), 0, '', '', '') + self.ami.send_event_newcallerid(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), 0) + self.ami.send_event_newaccountcode(sval(channel, 'uuid'), sval(channel, 'name')) + self.ami.send_event_newcallerid(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), 0) + self.ami.send_event_newstate(sval(channel, 'uuid'), sval(channel, 'name'), 5, sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name')) + + + def ami_send_originate_outbound(self, channel): + self.ami.send_event_newchannel(sval(channel, 'o_uuid'), sval(channel, 'o_name'), 0, '', '', '') + self.ami.send_event_dial_begin(sval(channel, 'uuid'), sval(channel, 'name'), sval(channel, 'caller_id_number'), sval(channel, 'caller_id_name'), sval(channel, 'o_name'), sval(channel, 'o_uuid'), sval(channel, 'destination_number')) + self.ami.send_event_newcallerid(sval(channel, 'o_uuid'), sval(channel, 'o_name'), sval(channel, 'destination_number'), '', 0) + self.ami.send_event_newstate(sval(channel, 'o_uuid'), sval(channel, 'o_name'), 5, sval(channel, 'destination_number'), '') diff --git a/misc/mon_ami/mon_ami_main.py b/misc/mon_ami/mon_ami_main.py new file mode 100644 index 0000000..13dd4bb --- /dev/null +++ b/misc/mon_ami/mon_ami_main.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface Server +# Main Programm +# (c) AMOOMA GmbH 2012 + +from log import ldebug, linfo, lwarn, lerror, lcritic, setup_log +from time import sleep +from signal import signal, SIGHUP, SIGTERM, SIGINT +from optparse import OptionParser +from freeswitch import FreeswitchEventSocket +from mon_ami_server import MonAMIServer +from sqliter import SQLiteR + +def signal_handler(signal_number, frame): + global event_socket + global mon_ami_server + + ldebug('signal %d received ' % signal_number, frame) + + if (signal_number == SIGTERM): + ldebug('shutdown signal (%d) received ' % signal_number, frame) + event_socket.stop() + mon_ami_server.stop() + elif (signal_number == SIGINT): + ldebug('interrupt signal (%d) received ' % signal_number, frame) + event_socket.stop() + mon_ami_server.stop() + elif (signal_number == SIGHUP): + ldebug('hangup signal (%d) received - ignore' % signal_number, frame) + +def user_password_authentication(user_name, password): + global configuration_options + + if configuration_options.user_ignore_name and configuration_options.user_ignore_password: + ldebug('user-password authentication credentials provided but ignored - user: %s, password: %s' % (user_name, '*' * len(str(password)))) + return True + + if configuration_options.user_override_name != None and configuration_options.user_override_password != None: + if user_name == configuration_options.user_override_name and password == configuration_options.user_override_password: + return True + return False + + db = SQLiteR(configuration_options.user_db_name) + if not db.connect(): + lerror('cound not connect to user database "%s"' % configuration_options.user_db_name) + return False + + user = db.find(configuration_options.user_db_table, {configuration_options.user_db_name_row: user_name, configuration_options.user_db_password_row: password}) + db.disconnect() + + if user: + ldebug('user-password authentication accepted - user: %s, password: %s' % (user_name, '*' * len(str(password)))) + return True + + linfo('user-password authentication failed - user: %s, password: %s' % (user_name, '*' * len(str(password)))) + return False + +def main(): + global event_socket + global mon_ami_server + global configuration_options + + option_parser = OptionParser() + + # Log options + option_parser.add_option("--log-file", action="store", type="string", dest="log_file", default=None) + option_parser.add_option("--log-level", action="store", type="int", dest="log_level", default=5) + + # FreeSWITCH event_socket + option_parser.add_option("--freeswitch-address", action="store", type="string", dest="freeswitch_address", default='127.0.0.1') + option_parser.add_option("--freeswitch-port", action="store", type="int", dest="freeswitch_port", default=8021) + option_parser.add_option("--freeswitch-password", action="store", type="string", dest="freeswitch_password", default='ClueCon') + + # Asterisk Manager Interface + option_parser.add_option("-a", "--address", "--ami-address", action="store", type="string", dest="ami_address", default='0.0.0.0') + option_parser.add_option("-p", "--port", "--ami-port", action="store", type="int", dest="ami_port", default=5038) + + # User database + option_parser.add_option("--user-db-name", action="store", type="string", dest="user_db_name", default='/opt/GS5/db/development.sqlite3') + option_parser.add_option("--user-db-table", action="store", type="string", dest="user_db_table", default='sip_accounts') + option_parser.add_option("--user-db-name-row", action="store", type="string", dest="user_db_name_row", default='auth_name') + option_parser.add_option("--user-db-password-row", action="store", type="string", dest="user_db_password_row", default='password') + + # Define common User/Password options + option_parser.add_option("--user-override-name", action="store", type="string", dest="user_override_name", default=None) + option_parser.add_option("--user-override-password", action="store", type="string", dest="user_override_password", default=None) + option_parser.add_option("--user-ignore-name", action="store_true", dest="user_ignore_name", default=False) + option_parser.add_option("--user-ignore-password", action="store_true", dest="user_ignore_password", default=False) + + (configuration_options, args) = option_parser.parse_args() + + setup_log(configuration_options.log_file, configuration_options.log_level) + ldebug('starting MonAMI main process') + + # Catch signals + signal(SIGHUP, signal_handler) + signal(SIGTERM, signal_handler) + signal(SIGINT, signal_handler) + + # Starting FreeSWITCH event_socket thread + event_socket = FreeswitchEventSocket(configuration_options.freeswitch_address, configuration_options.freeswitch_port, configuration_options.freeswitch_password) + event_socket.start() + + if event_socket.isAlive(): + # Starting Asterisk manager thread + mon_ami_server = MonAMIServer(configuration_options.ami_address, configuration_options.ami_port, event_socket) + mon_ami_server.user_password_authentication = user_password_authentication + mon_ami_server.start() + + while mon_ami_server.isAlive(): + sleep(1) + + if event_socket.isAlive(): + ldebug('killing event_socket thread') + event_socket.stop() + + ldebug('exiting MonAMI main process') diff --git a/misc/mon_ami/mon_ami_server.py b/misc/mon_ami/mon_ami_server.py new file mode 100644 index 0000000..68e72c8 --- /dev/null +++ b/misc/mon_ami/mon_ami_server.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface Server +# Asterisk AMI Emulator server thread +# (c) AMOOMA GmbH 2012 + +from threading import Thread +from log import ldebug, linfo, lwarn, lerror, lcritic +from time import sleep +from traceback import format_exc +from tcp_server import TCPServer +from mon_ami_handler import MonAMIHandler +import socket + +class MonAMIServer(Thread): + + def __init__(self, address=None, port=None, event_socket=None): + Thread.__init__(self) + self.runthread = True + self.port = port + self.address = address + self.event_socket = event_socket + self.handler_threads = {} + self.user_password_authentication = None + + def stop(self): + ldebug('thread stop', self) + ldebug('client connections: %s' % len(self.handler_threads), self) + for thread_id, handler_thread in self.handler_threads.items(): + if handler_thread.isAlive(): + handler_thread.stop() + self.runthread = False + + def register_handler_thread(self, handler_thread): + if handler_thread.isAlive(): + ldebug('registering handler thread %d ' % id(handler_thread), self) + self.handler_threads[id(handler_thread)] = handler_thread + else: + lwarn('handler thread passed away: %d' % id(handler_thread), self) + + + def deregister_handler_thread(self, handler_thread): + if id(handler_thread) in self.handler_threads: + ldebug('deregistering handler thread %d ' % id(handler_thread), self) + del self.handler_threads[id(handler_thread)] + else: + lwarn('handler thread %d not registered' % id(handler_thread), self) + + + def run(self): + ldebug('starting MonAMI server thread', self) + serversocket = TCPServer(self.address, self.port).listen() + #serversocket.setblocking(0) + + if not serversocket: + ldebug('server socket could not be bound', self) + return 1 + + while self.runthread: + try: + client_socket, client_address = serversocket.accept() + except socket.timeout as exception: + # Socket timeout occured + continue + except socket.error as exception: + lerror('socket error (%s): %s - ' % (exception, format_exc()), self) + sleep(1) + continue + except: + lerror('general error: %s - ' % format_exc(), self) + sleep(1) + continue + + ldebug('connected to %s:%d' % client_address, self) + + client_thread = MonAMIHandler(client_socket, client_address, self.event_socket) + client_thread.deregister_at_server = self.deregister_handler_thread + client_thread.user_password_authentication = self.user_password_authentication + client_thread.start() + if client_thread.isAlive(): + self.register_handler_thread(client_thread) + + ldebug('registered handler threads: %d' % len(self.handler_threads), self) + + ldebug('exiting MonAMI server thread', self) + diff --git a/misc/mon_ami/sqliter.py b/misc/mon_ami/sqliter.py new file mode 100644 index 0000000..5b03729 --- /dev/null +++ b/misc/mon_ami/sqliter.py @@ -0,0 +1,132 @@ +# -*- coding: utf-8 -*- +# SQLite library + +import sqlite3 + +class SQLiteR(): + + def __init__(self, database = None): + self.db_name = database + if (self.db_name == None): + self.db_name = ':memory:' + self.db_conn = None + self.db_cursor = None + + def record_factory(self, cursor, row): + record = dict() + for index, column in enumerate(cursor.description): + record[column[0]] = row[index] + return record + + def connect(self, isolation_level = None): + try: + self.db_conn = sqlite3.connect(self.db_name) + self.db_conn.row_factory = self.record_factory + self.db_cursor = self.db_conn.cursor() + except: + return False + + self.db_conn.isolation_level = isolation_level + return True + + def disconnect(self): + try: + self.db_nonn.close() + except: + return False + return True + + def execute(self, query, parameters = []): + try: + return self.db_cursor.execute(query, parameters) + except: + return False + + def fetch_row(self): + return self.db_cursor.fetchone() + + def fetch_rows(self): + return self.db_cursor.fetchall() + + def execute_get_rows(self, query, parameters = []): + if (self.execute(query, parameters)): + return self.fetch_rows() + else: + return False + + def execute_get_row(self, query, parameters = []): + query = "%s LIMIT 1" % query + if (self.execute(query, parameters)): + return self.fetch_row() + else: + return False + + def execute_get_value(self, query, parameters = []): + row = self.execute_get_row(query, parameters) + if (row): + return row[0] + else: + return row + + def create_table(self, table, structure, primary_key = None): + columns = list() + for row in structure: + key, value = row.items()[0] + sql_type = "VARCHAR(255)" + sql_key = '' + if (key == primary_key): + sql_key = 'PRIMARY KEY' + type_r = value.split(':', 1) + type_n = type_r[0] + if (type_n == 'integer'): + sql_type = 'INTEGER' + elif (type_n == 'string'): + try: + sql_type = "VARCHAR(%s)" % type_r[1] + except IndexError, e: + sql_type = "VARCHAR(255)" + + columns.append('"%s" %s %s' % (key, sql_type, sql_key)) + + query = 'CREATE TABLE "%s" (%s)' % (table, ', '.join(columns)) + return self.execute(query) + + def save(self, table, row): + keys = row.keys() + query = 'INSERT OR REPLACE INTO "%s" (%s) VALUES (:%s)' % (table, ', '.join(keys), ', :'.join(keys)) + + return self.execute(query, row) + + def find_sql(self, table, rows = None): + values = list() + if (rows): + if (type(rows) == type(list())): + rows_list = rows + else: + rows_list = list() + rows_list.append(rows) + + query_parts = list() + + for row in rows_list: + statements = list() + for key, value in row.items(): + if (value == None): + statements.append("\"%s\" IS ?" % (key)) + else: + statements.append("\"%s\" = ?" % (key)) + values.append(value) + query_parts.append('(%s)' % ' AND '.join(statements)) + + query = 'SELECT * FROM "%s" WHERE %s' % (table, ' OR '.join(query_parts)) + else: + query = 'SELECT * FROM "%s"' % table + return query, values + + def find(self, table, row = None): + query, value = self.find_sql(table, row) + return self.execute_get_row(query, value) + + def findall(self, table, row = None): + query, values = self.find_sql(table, row) + return self.execute_get_rows(query, values) diff --git a/misc/mon_ami/tcp_server.py b/misc/mon_ami/tcp_server.py new file mode 100644 index 0000000..5536282 --- /dev/null +++ b/misc/mon_ami/tcp_server.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# MonAMI Asterisk Manger Interface Server +# TCP Server +# (c) AMOOMA GmbH 2012 + +import socket +from traceback import format_exc +from log import ldebug, linfo, lwarn, lerror, lcritic + +class TCPServer(): + + def __init__(self, address=None, port=None, timeout=1): + self.SOCKET_BACKLOG = 5 + self.port = port + self.address = address + self.socket_timeout = timeout + + def listen(self): + tcpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcpsocket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) + + ldebug('binding server to %s:%d, timeout: %d' % (self.address, self.port, self.socket_timeout), self) + + try: + tcpsocket.bind((self.address, self.port)) + except ValueError as exception: + lerror('server socket address error: %s - %s' % (exception, format_exc()), self) + return False + except socket.error as exception: + lerror('server socket error (%d): %s - %s' % (exception[0], exception[1], format_exc()), self) + return False + except: + lerror('general server socket error: %s' % format_exc(), self) + return False + + tcpsocket.listen(self.SOCKET_BACKLOG) + tcpsocket.settimeout(self.socket_timeout) + + return tcpsocket -- cgit v1.2.3