Class: Qpid::Proton::WorkQueue
- Inherits:
-
Object
- Object
- Qpid::Proton::WorkQueue
- Defined in:
- lib/core/work_queue.rb
Overview
A thread-safe queue of work for multi-threaded programs.
A Container can have multiple threads calling Container#run The container ensures that work associated with a single Connection or Listener is serialized - two threads will never concurrently call handlers associated with the same object.
To have your own code serialized in the same, add a block to the connection’s WorkQueue. The block will be invoked as soon as it is safe to do so.
A Connection and the objects associated with it (Session, Sender, Receiver, Delivery, Tracker) are not thread safe, so if you have multiple threads calling Container#run or if you want to affect objects managed by the container from non-container threads you need to use the WorkQueue
Defined Under Namespace
Classes: StoppedError
Instance Method Summary collapse
-
#add {|| ... } ⇒ void
Add a block of code to be invoked in sequence.
- #clear ⇒ Object
- #close ⇒ Object
- #empty? ⇒ Boolean
-
#initialize(container) ⇒ WorkQueue
constructor
A new instance of WorkQueue.
- #next_tick ⇒ Object
- #process(now) ⇒ Object
-
#schedule(at) {|| ... } ⇒ void
Schedule a block to be invoked at a certain time.
Constructor Details
Instance Method Details
#add {|| ... } ⇒ void
Thread Safe: may be called in any thread.
This method returns an undefined value.
Add a block of code to be invoked in sequence.
49 50 51 |
# File 'lib/core/work_queue.rb', line 49 def add(&block) schedule(0, &block) end |
#clear ⇒ Object
95 |
# File 'lib/core/work_queue.rb', line 95 def clear() @lock.synchronize { @schedule.clear } end |
#close ⇒ Object
79 |
# File 'lib/core/work_queue.rb', line 79 def close() @lock.synchronize { @closed = StoppedError.new } end |
#empty? ⇒ Boolean
92 |
# File 'lib/core/work_queue.rb', line 92 def empty?() @lock.synchronize { @schedule.empty? } end |
#next_tick ⇒ Object
89 |
# File 'lib/core/work_queue.rb', line 89 def next_tick() @lock.synchronize { @schedule.next_tick } end |
#process(now) ⇒ Object
82 83 84 85 86 |
# File 'lib/core/work_queue.rb', line 82 def process(now) while p = @lock.synchronize { @schedule.pop(now) } p.call end end |
#schedule(at) {|| ... } ⇒ void
Thread Safe: may be called in any thread.
This method returns an undefined value.
Schedule a block to be invoked at a certain time.
61 62 63 64 65 66 67 68 |
# File 'lib/core/work_queue.rb', line 61 def schedule(at, &block) raise ArgumentError, "no block" unless block_given? @lock.synchronize do raise @closed if @closed @schedule.insert(at, block) end @container.send :wake end |