summaryrefslogtreecommitdiff
path: root/rapid/idletube.py
diff options
context:
space:
mode:
authorJulien Valroff <julien@kirya.net>2011-04-08 07:12:47 +0200
committerJulien Valroff <julien@kirya.net>2011-04-08 07:12:47 +0200
commit5168fdb07d6dc2b77f0ef9c7502940ce4a02e9aa (patch)
tree92ee1b0789e6527052973d100ea9d6426afc70cc /rapid/idletube.py
parenteb4c5cc4472b16ce10401611140381e5ba5b6aca (diff)
Imported Upstream version 0.3.6upstream/0.3.6
Diffstat (limited to 'rapid/idletube.py')
-rw-r--r--rapid/idletube.py205
1 files changed, 205 insertions, 0 deletions
diff --git a/rapid/idletube.py b/rapid/idletube.py
new file mode 100644
index 0000000..0b07536
--- /dev/null
+++ b/rapid/idletube.py
@@ -0,0 +1,205 @@
+
+# Copyright (c) 2005 Antoon Pardon
+#
+# Modified 2010 by Damon Lynch to use python's higher performance deque, rather than a regular list
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included
+# in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+import collections
+from Queue import Queue
+
+from threading import Lock
+from thread import get_ident
+
+from types import BooleanType as UnConnected
+
+UnRegistered, Registered = False, True
+
+
+class EOInformation(Exception):
+ pass
+
+class TubeAccess(Exception):
+ pass
+
+
+class Fifo:
+
+ def __init__(self):
+ self.fifo = collections.deque()
+
+ def put(self, item):
+ self.fifo.append(item)
+
+ def get(self):
+ return self.fifo.popleft()
+
+ def size(self):
+ return len(self.fifo)
+
+
+class Tube:
+
+ def __init__(self, maxsize, lck = Lock, container = None):
+ if container is None:
+ container = Fifo()
+ self.readers = set()
+ self.writers = set()
+ self.container = container
+ self.maxsize = maxsize
+ self.cb_arglst = []
+ self.cb_src = UnRegistered
+ self.in_use = Lock()
+ self.nowriter = lck()
+ self.full = lck()
+ self.empty = lck()
+ self.empty.acquire()
+ self.nowriter.acquire()
+
+ def open(self, access = 'r', *to):
+ thrd = get_ident()
+ access = access.lower()
+ self.in_use.acquire()
+ if 'w' in access:
+ if len(self.writers) == 0:
+ for _ in self.readers:
+ self.nowriter.release()
+ self.writers.add(thrd)
+ if 'r' in access:
+ self.readers.add(thrd)
+ if len(self.writers) == 0:
+ self.in_use.release()
+ self.nowriter.acquire(*to)
+ else:
+ self.in_use.release()
+ else:
+ self.in_use.release()
+
+ def close(self, access = 'rw'):
+ thrd = get_ident()
+ access = access.lower()
+ self.in_use.acquire()
+ if 'r' in access:
+ self.readers.discard(thrd)
+ if 'w' in access:
+ self.writers.discard(thrd)
+ if len(self.writers) == 0:
+ if self.container.size() == 0:
+ self.empty.release()
+ if self.cb_src is Registered and len(self.readers) > 0:
+ self.cb_src = gob.idle_add(self._idle_callback)
+ for _ in self.readers:
+ self.container.put(EOInformation)
+ self.in_use.release()
+
+ def size(self):
+ self.in_use.acquire()
+ size = self.container.size()
+ self.in_use.release()
+ return size
+
+ def get(self, *to):
+ thrd = get_ident()
+ if thrd not in self.readers:
+ raise TubeAccess, "Thread has no read access for tube"
+ self.empty.acquire(*to)
+ self.in_use.acquire()
+ size = self.container.size()
+ if size == self.maxsize:
+ self.full.release()
+ item = self.container.get()
+ if size != 1:
+ self.empty.release()
+ elif type(self.cb_src) is not UnConnected:
+ gob.source_remove(self.cb_src)
+ self.cb_src = Registered
+ self.in_use.release()
+ if item is EOInformation:
+ raise EOInformation
+ else:
+ return item
+
+ def put(self, item, *to):
+ thrd = get_ident()
+ if thrd not in self.writers:
+ raise TubeAccess, "Thread has no write access for tube"
+ if thrd in self.readers:
+ self._put_rw(item)
+ else:
+ self._put_wo(item, *to)
+
+ def _put_wo(self, item, *to):
+ self.full.acquire(*to)
+ self.in_use.acquire()
+ size = self.container.size()
+ if size == 0:
+ self.empty.release()
+ if self.cb_src is Registered:
+ self.cb_src = gob.idle_add(self._idle_callback)
+ self.container.put(item)
+ if size + 1 < self.maxsize:
+ self.full.release()
+ self.in_use.release()
+
+ def _put_rw(self, item):
+ self.in_use.acquire()
+ size = self.container.size()
+ if size == 0:
+ self.empty.release()
+ if self.cb_src is Registered:
+ self.cb_src = gob.idle_add(self._idle_callback)
+ self.container.put(item)
+ self.in_use.release()
+
+ def _idle_callback(self):
+ self.in_use.acquire()
+ lst = self.cb_arglst.pop(0)
+ self.in_use.release()
+ func = lst[0]
+ lst[0] = self
+ ret_val = func(*lst)
+ self.in_use.acquire()
+ if ret_val:
+ lst[0] = func
+ self.cb_arglst.append(lst)
+ elif self.cb_arglst == []:
+ self.cb_src = UnRegistered
+ self.in_use.release()
+ return self.cb_src is not UnRegistered
+
+
+def tube_add_watch(tube, callback, *args):
+
+ global gob
+ import gobject as gob
+
+ tube.in_use.acquire()
+ tube.cb_arglst.append([callback] + list(args))
+ if tube.cb_src is UnRegistered:
+ if tube.container.size() == 0:
+ tube.cb_src = Registered
+ else:
+ tube.cb_src = gob.idle_add(tube._idle_callback)
+ tube.in_use.release()
+
+def tube_remove_watch(tube):
+## tube.in_use.acquire()
+## gob.source_remove(tube.cb_src)
+## tube._idle_callback.handler_block(tube.cb_src)
+ pass