Class: Cod::WorkQueue
- Inherits:
-
Object
- Object
- Cod::WorkQueue
- Defined in:
- lib/cod/work_queue.rb
Overview
Describes a queue that stores work items given through #schedule and works through those items in order if #predicate is true.
Synopsis:
queue = WorkQueue.new
queue.predicate { true }
queue.schedule {
# some work
}
# Will try to work through items right now.
queue.try_work
# Will cleanly shutdown background threads, but not finish work.
queue.shutdown
Instance Attribute Summary collapse
-
#thread ⇒ Object
readonly
The internal thread that is used to work on scheduled items in the background.
Instance Method Summary collapse
- #clear_thread_semaphore ⇒ Object
-
#initialize ⇒ WorkQueue
constructor
:nodoc:.
-
#predicate(&predicate) ⇒ Object
Before any kind of work is attempted, this predicate must evaluate to true.
-
#schedule(&work) ⇒ Object
Schedules a piece of work.
-
#shutdown ⇒ Object
Shuts down the queue properly, without waiting for work to be completed.
-
#size ⇒ Object
Returns the size of the queue.
- #thread_semaphore_set? ⇒ Boolean
- #try_work ⇒ Object
Constructor Details
#initialize ⇒ WorkQueue
:nodoc:
21 22 23 24 25 26 27 28 29 30 |
# File 'lib/cod/work_queue.rb', line 21 def initialize # NOTE: This is an array that is protected by careful coding, rather # than a mutex. Queue would be right, but Rubys GIL will interfere with # that producing more deadlocks than I would like. @queue = Array.new @try_work_exclusive_section = ExclusiveSection.new @thread = Thread.start(&method(:thread_main)) end |
Instance Attribute Details
#thread ⇒ Object (readonly)
The internal thread that is used to work on scheduled items in the background.
34 35 36 |
# File 'lib/cod/work_queue.rb', line 34 def thread @thread end |
Instance Method Details
#clear_thread_semaphore ⇒ Object
78 79 80 |
# File 'lib/cod/work_queue.rb', line 78 def clear_thread_semaphore @one_turn = false end |
#predicate(&predicate) ⇒ Object
Before any kind of work is attempted, this predicate must evaluate to true. It is tested repeatedly.
Example:
work_queue.predicate { connection.established? }
53 54 55 |
# File 'lib/cod/work_queue.rb', line 53 def predicate(&predicate) @predicate = predicate end |
#schedule(&work) ⇒ Object
Schedules a piece of work. Example:
work_queue.schedule { a_piece_of_work }
61 62 63 |
# File 'lib/cod/work_queue.rb', line 61 def schedule(&work) @queue << work end |
#shutdown ⇒ Object
Shuts down the queue properly, without waiting for work to be completed.
67 68 69 70 |
# File 'lib/cod/work_queue.rb', line 67 def shutdown @shutdown_requested = true @thread.join end |
#size ⇒ Object
Returns the size of the queue.
74 75 76 |
# File 'lib/cod/work_queue.rb', line 74 def size @queue.size end |
#thread_semaphore_set? ⇒ Boolean
81 82 83 |
# File 'lib/cod/work_queue.rb', line 81 def thread_semaphore_set? @one_turn end |
#try_work ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/cod/work_queue.rb', line 36 def try_work @try_work_exclusive_section.enter { # NOTE if predicate is nil or not set, no work will be accomplished. # This is the way I need it. while !@queue.empty? && @predicate && @predicate.call wi = @queue.shift wi.call end } end |