summaryrefslogtreecommitdiff
path: root/src/threads/Workers.vala
diff options
context:
space:
mode:
authorJörg Frings-Fürst <debian@jff-webhosting.net>2014-07-23 09:06:59 +0200
committerJörg Frings-Fürst <debian@jff-webhosting.net>2014-07-23 09:06:59 +0200
commit4ea2cc3bd4a7d9b1c54a9d33e6a1cf82e7c8c21d (patch)
treed2e54377d14d604356c86862a326f64ae64dadd6 /src/threads/Workers.vala
Imported Upstream version 0.18.1upstream/0.18.1
Diffstat (limited to 'src/threads/Workers.vala')
-rw-r--r--src/threads/Workers.vala104
1 files changed, 104 insertions, 0 deletions
diff --git a/src/threads/Workers.vala b/src/threads/Workers.vala
new file mode 100644
index 0000000..756eb01
--- /dev/null
+++ b/src/threads/Workers.vala
@@ -0,0 +1,104 @@
+/* Copyright 2009-2014 Yorba Foundation
+ *
+ * This software is licensed under the GNU LGPL (version 2.1 or later).
+ * See the COPYING file in this distribution.
+ */
+
+
+public class BackgroundJobBatch : SortedList<BackgroundJob> {
+ public BackgroundJobBatch() {
+ base (BackgroundJob.priority_comparator);
+ }
+}
+
+// Workers wraps some of ThreadPool's oddities up into an interface that emphasizes BackgroundJobs.
+public class Workers {
+ public const int UNLIMITED_THREADS = -1;
+
+ private ThreadPool<void *> thread_pool;
+ private AsyncQueue<BackgroundJob> queue = new AsyncQueue<BackgroundJob>();
+ private EventSemaphore empty_event = new EventSemaphore();
+ private int enqueued = 0;
+
+ public Workers(int max_threads, bool exclusive) {
+ if (max_threads <= 0 && max_threads != UNLIMITED_THREADS)
+ max_threads = 1;
+
+ // event starts as set because queue is empty
+ empty_event.notify();
+
+ try {
+ thread_pool = new ThreadPool<void *>.with_owned_data(thread_start, max_threads, exclusive);
+ } catch (ThreadError err) {
+ error("Unable to create thread pool: %s", err.message);
+ }
+ }
+
+ public static int threads_per_cpu(int per = 1, int max = -1) requires (per > 0) ensures (result > 0) {
+ int count = number_of_processors() * per;
+
+ return (max < 0) ? count : count.clamp(0, max);
+ }
+
+ // This is useful when the intent is for the worker threads to use all the CPUs minus one for
+ // the main/UI thread. (No guarantees, of course.)
+ public static int thread_per_cpu_minus_one() ensures (result > 0) {
+ return (number_of_processors() - 1).clamp(1, int.MAX);
+ }
+
+ // Enqueues a BackgroundJob for work in a thread context. BackgroundJob.execute() is called
+ // within the thread's context, while its CompletionCallback is called within the Gtk event loop.
+ public void enqueue(BackgroundJob job) {
+ empty_event.reset();
+
+ lock (queue) {
+ queue.push_sorted(job, BackgroundJob.priority_compare_func);
+ enqueued++;
+ }
+
+ try {
+ thread_pool.add(job);
+ } catch (ThreadError err) {
+ // error should only occur when a thread could not be created, in which case, the
+ // BackgroundJob is queued up
+ warning("Unable to create worker thread: %s", err.message);
+ }
+ }
+
+ public void enqueue_many(BackgroundJobBatch batch) {
+ foreach (BackgroundJob job in batch)
+ enqueue(job);
+ }
+
+ public void wait_for_empty_queue() {
+ empty_event.wait();
+ }
+
+ // Returns the number of BackgroundJobs on the queue, not including active jobs.
+ public int get_pending_job_count() {
+ lock (queue) {
+ return enqueued;
+ }
+ }
+
+ private void thread_start(void *ignored) {
+ BackgroundJob? job;
+ bool empty;
+ lock (queue) {
+ job = queue.try_pop();
+ assert(job != null);
+
+ assert(enqueued > 0);
+ empty = (--enqueued == 0);
+ }
+
+ if (!job.is_cancelled())
+ job.execute();
+
+ job.internal_notify_completion();
+
+ if (empty)
+ empty_event.notify();
+ }
+}
+