diff options
Diffstat (limited to 'rapid/idletube.py')
-rw-r--r-- | rapid/idletube.py | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/rapid/idletube.py b/rapid/idletube.py new file mode 100644 index 0000000..86ff1a4 --- /dev/null +++ b/rapid/idletube.py @@ -0,0 +1,205 @@ + +# Copyright (c) 2005 Antoon Pardon +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the "Software"), +# to deal in the Software without restriction, including without limitation +# the rights to use, copy, modify, merge, publish, distribute, sublicense, +# and/or sell copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +from threading import Lock +from thread import get_ident + +from types import BooleanType as UnConnected + +UnRegistered, Registered = False, True + +class EOInformation(Exception): + pass + +class TubeAccess(Exception): + pass + +class Fifo: + + def __init__(self): + self.fifo = [] + + def put(self, item): + self.fifo.append(item) + + def get(self): + return self.fifo.pop(0) + + def size(self): + return len(self.fifo) + +class Tube: + + def __init__(self, maxsize, lck = Lock, container = None): + if container is None: + container = Fifo() + self.readers = set() + self.writers = set() + self.container = container + self.maxsize = maxsize + self.cb_arglst = [] + self.cb_src = UnRegistered + self.in_use = Lock() + self.nowriter = lck() + self.full = lck() + self.empty = lck() + self.empty.acquire() + self.nowriter.acquire() + + def open(self, access = 'r', *to): + thrd = get_ident() + access = access.lower() + self.in_use.acquire() + if 'w' in access: + if len(self.writers) == 0: + for _ in self.readers: + self.nowriter.release() + self.writers.add(thrd) + if 'r' in access: + self.readers.add(thrd) + if len(self.writers) == 0: + self.in_use.release() + self.nowriter.acquire(*to) + else: + self.in_use.release() + else: + self.in_use.release() + + def close(self, access = 'rw'): + thrd = get_ident() + access = access.lower() + self.in_use.acquire() + if 'r' in access: + self.readers.discard(thrd) + if 'w' in access: + self.writers.discard(thrd) +## print "have", self.writers, "writers" + if len(self.writers) == 0: + if self.container.size() == 0: +## print "emptying container, as size is", self.container.size() + self.empty.release() + if self.cb_src is Registered and len(self.readers) > 0: +## print "adding callback" + self.cb_src = gob.idle_add(self._idle_callback) +## else: +## print "container size not empty, is", self.container.size() + for _ in self.readers: +## print "putting EOInformation" + self.container.put(EOInformation) + self.in_use.release() + + def size(self): + self.in_use.acquire() + size = self.container.size() + self.in_use.release() + return size + + def get(self, *to): + thrd = get_ident() + if thrd not in self.readers: + raise TubeAccess, "Thread has no read access for tube" + self.empty.acquire(*to) + self.in_use.acquire() + size = self.container.size() + if size == self.maxsize: + self.full.release() + item = self.container.get() + if size != 1: + self.empty.release() + elif type(self.cb_src) is not UnConnected: + gob.source_remove(self.cb_src) + self.cb_src = Registered + self.in_use.release() + if item is EOInformation: + raise EOInformation + else: + return item + + def put(self, item, *to): + thrd = get_ident() + if thrd not in self.writers: + raise TubeAccess, "Thread has no write access for tube" + if thrd in self.readers: + self._put_rw(item) + else: + self._put_wo(item, *to) + + def _put_wo(self, item, *to): + self.full.acquire(*to) + self.in_use.acquire() + size = self.container.size() + if size == 0: + self.empty.release() + if self.cb_src is Registered: + #gdk.threads_enter() + self.cb_src = gob.idle_add(self._idle_callback) + #gdk.threads_leave() + self.container.put(item) + if size + 1 < self.maxsize: + self.full.release() + self.in_use.release() + + def _put_rw(self, item): + self.in_use.acquire() + size = self.container.size() + if size == 0: + self.empty.release() + if self.cb_src is Registered: + self.cb_src = gob.idle_add(self._idle_callback) + self.container.put(item) + self.in_use.release() + + def _idle_callback(self): + self.in_use.acquire() + lst = self.cb_arglst.pop(0) + self.in_use.release() + func = lst[0] + lst[0] = self + ret_val = func(*lst) + self.in_use.acquire() + if ret_val: + lst[0] = func + self.cb_arglst.append(lst) + elif self.cb_arglst == []: + self.cb_src = UnRegistered + self.in_use.release() + return self.cb_src is not UnRegistered + + +def tube_add_watch(tube, callback, *args): + + global gob #, gdk + import gobject as gob + #import gtk.gdk as gdk + + tube.in_use.acquire() + tube.cb_arglst.append([callback] + list(args)) + if tube.cb_src is UnRegistered: + if tube.container.size() == 0: + tube.cb_src = Registered + else: + tube.cb_src = gob.idle_add(tube._idle_callback) + tube.in_use.release() + +def tube_remove_watch(tube): +## tube.in_use.acquire() +## gob.source_remove(tube.cb_src) +## tube._idle_callback.handler_block(tube.cb_src) + pass |