summaryrefslogtreecommitdiff
path: root/misc/mon_ami/freeswitch.py
diff options
context:
space:
mode:
authorStefan Wintermeyer <stefan.wintermeyer@amooma.de>2012-12-17 12:01:45 +0100
committerStefan Wintermeyer <stefan.wintermeyer@amooma.de>2012-12-17 12:01:45 +0100
commitb80bd744ad873f6fc43018bc4bfb90677de167bd (patch)
tree072c4b0e33d442528555b82c415f5e7a1712b2b0 /misc/mon_ami/freeswitch.py
parent3e706c2025ecc5523e81ad649639ef2ff75e7bac (diff)
Start of GS5.
Diffstat (limited to 'misc/mon_ami/freeswitch.py')
-rw-r--r--misc/mon_ami/freeswitch.py194
1 files changed, 194 insertions, 0 deletions
diff --git a/misc/mon_ami/freeswitch.py b/misc/mon_ami/freeswitch.py
new file mode 100644
index 0000000..eab9bb6
--- /dev/null
+++ b/misc/mon_ami/freeswitch.py
@@ -0,0 +1,194 @@
+# -*- coding: utf-8 -*-
+# MonAMI Asterisk Manger Interface server
+# FreeSWITCH event socket interface
+# (c) AMOOMA GmbH 2012
+
+from threading import Thread, Lock
+from log import ldebug, linfo, lwarn, lerror, lcritic
+from collections import deque
+from time import sleep, time
+from random import random
+from helper import to_hash
+from traceback import format_exc
+import socket
+import sys
+import hashlib
+
+
+class FreeswitchEventSocket(Thread):
+
+ def __init__(self, host, port, password):
+ Thread.__init__(self)
+ self.LINE_SEPARATOR = "\n"
+ self.SOCKET_TIMEOUT = 1
+ self.MESSAGE_PIPE_MAX_LENGTH = 128
+ self.write_lock = Lock()
+ self.host = host
+ self.port = port
+ self.password = password
+ self.runthread = True
+ self.fs = None
+ self.client_queues = {}
+
+
+ def stop(self):
+ ldebug('thread stop', self)
+ self.runthread = False
+
+
+ def run(self):
+ ldebug('starting FreeSWITCH event_socket thread', self)
+
+ while self.runthread:
+ if not self.connect():
+ ldebug('could not connect to FreeSWITCH - retry', self)
+ sleep(self.SOCKET_TIMEOUT)
+ continue
+ ldebug('opening event_socket connection', self)
+
+ data = ''
+ while self.runthread and self.fs:
+
+ try:
+ recv_buffer = self.fs.recv(128)
+ except socket.timeout as exception:
+ # Socket timeout occured
+ continue
+ except:
+ lerror(format_exc(), self)
+ self.runthread = False
+ break
+
+ if not recv_buffer:
+ ldebug('event_socket connection lost', self)
+ break
+
+ data += recv_buffer
+ messages = data.split(self.LINE_SEPARATOR * 2)
+ data = messages.pop()
+
+ for message_str in messages:
+ if not message_str:
+ continue
+ message_body = None
+
+ message = to_hash(message_str.split(self.LINE_SEPARATOR))
+
+ if not 'Content-Type' in message:
+ ldebug('message without Content-Type', self)
+ continue
+
+ if 'Content-Length' in message and int(message['Content-Length']) > 0:
+ content_length = int(message['Content-Length'])
+ while len(data) < int(message['Content-Length']):
+ try:
+ data += self.fs.recv(content_length - len(data))
+ except socket.timeout as exception:
+ ldebug('Socket timeout in message body', self)
+ continue
+ except:
+ lerror(format_exc(), self)
+ break
+ message_body = data.strip()
+ data = ''
+ else:
+ content_length = 0
+
+ self.process_message(message['Content-Type'], message, content_length, message_body)
+
+
+ ldebug('closing event_socket connection', self)
+ if self.fs:
+ self.fs.close()
+
+
+ def connect(self):
+ fs = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ fs.connect((self.host, self.port))
+ except:
+ lerror(format_exc(), self)
+ return False
+
+ fs.settimeout(self.SOCKET_TIMEOUT)
+ self.fs = fs
+ return True
+
+
+ def authenticate(self):
+ ldebug('send authentication to FreeSWITCH', self)
+ self.send_message("auth %s" % self.password)
+
+
+ def send(self, send_buffer):
+ try:
+ self.write_lock.acquire()
+ self.fs.send(send_buffer)
+ self.write_lock.release()
+ return True
+ except:
+ return False
+
+
+ def send_message(self, *message):
+ if len(message) == 1 and type(message[0]) == list:
+ self.send(self.LINE_SEPARATOR.join(message[0]) + (self.LINE_SEPARATOR * 2))
+ else:
+ self.send(self.LINE_SEPARATOR.join(message) + (self.LINE_SEPARATOR * 2))
+
+
+ def process_message(self, content_type, message_head, content_length, message_body):
+
+ if content_type == 'auth/request':
+ self.authenticate()
+ if content_type == 'command/reply':
+ if 'Reply-Text' in message_head:
+ ldebug('FreeSWITCH command reply: %s' % message_head['Reply-Text'], self)
+ elif content_type == 'text/event-plain':
+ event = to_hash(message_body.split(self.LINE_SEPARATOR))
+
+ if 'Event-Name' in event and event['Event-Name'] in self.client_queues:
+ event_type = event['Event-Name']
+ for entry_id, message_pipe in self.client_queues[event_type].items():
+ if type(message_pipe) == deque:
+ if len(message_pipe) < self.MESSAGE_PIPE_MAX_LENGTH:
+ message_pipe.appendleft({'type': 'freeswitch_event', 'body': event})
+ else:
+ lwarn("event queue %d full" % entry_id)
+ else:
+ ldebug("force-deregister event queue %d for event type %s" % (entry_id, event_type), self)
+ del self.client_queues[event_type][entry_id]
+
+ def register_client_queue(self, queue, event_type):
+ if not event_type in self.client_queues:
+ self.client_queues[event_type] = {}
+ self.send_message("event plain all %s" % event_type)
+ ldebug("we are listening now to events of type: %s" % event_type, self)
+ self.client_queues[event_type][id(queue)] = queue
+ ldebug("event queue %d registered for event type: %s" % (id(queue), event_type), self)
+
+
+ def deregister_client_queue(self, queue, event_type):
+ ldebug("deregister event queue %d for event type %s" % (id(queue), event_type), self)
+ del self.client_queues[event_type][id(queue)]
+
+ def deregister_client_queue_all(self, queue):
+ for event_type, event_queues in self.client_queues.items():
+ if id(queue) in event_queues:
+ ldebug("deregister event queue %d for all registered event types - event type %s" % (id(queue), event_type), self)
+ del self.client_queues[event_type][id(queue)]
+
+
+ def hangup(self, uuid, hangup_cause = 'NORMAL_CLEARING'):
+ ldebug('hangup channel: %s' % uuid, self)
+ self.send_message('SendMsg %s' % uuid, 'call-command: hangup', 'hangup-cause: %s' % hangup_cause)
+
+ return True
+
+
+ def originate(self, sip_account, extension, action_id = ''):
+ uuid = hashlib.md5('%s%f' % (sip_account, random() * 65534)).hexdigest()
+ ldebug('originate call - from: %s, to: %s, uuid: %s' % (sip_account, extension, uuid), self)
+ self.send_message('bgapi originate {origination_uuid=%s,origination_action=%s,origination_caller_id_number=%s}user/%s %s' % (uuid, action_id, sip_account, sip_account, extension))
+
+ return uuid