Class: RSMP::Queue
- Inherits:
-
Object
- Object
- RSMP::Queue
- Includes:
- Receiver
- Defined in:
- lib/rsmp/collect/queue.rb
Instance Attribute Summary collapse
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
Instance Method Summary collapse
- #clear ⇒ Object
- #handle_message(message) ⇒ Object
-
#initialize(distributor, task:, filter: nil) ⇒ Queue
constructor
A new instance of Queue.
- #wait_for_message(timeout: nil) ⇒ Object
Methods included from Receiver
#accept_message?, #initialize_receiver, #receive, #receive_error, #reject_message?, #start_receiving, #stop_receiving
Methods included from Inspect
Constructor Details
#initialize(distributor, task:, filter: nil) ⇒ Queue
Returns a new instance of Queue.
10 11 12 13 14 15 |
# File 'lib/rsmp/collect/queue.rb', line 10 def initialize(distributor, task:, filter: nil) initialize_receiver distributor, filter: filter @condition = Async::Notification.new @task = task clear end |
Instance Attribute Details
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
8 9 10 |
# File 'lib/rsmp/collect/queue.rb', line 8 def end |
Instance Method Details
#clear ⇒ Object
17 18 19 |
# File 'lib/rsmp/collect/queue.rb', line 17 def clear = [] end |
#handle_message(message) ⇒ Object
34 35 36 37 |
# File 'lib/rsmp/collect/queue.rb', line 34 def () << @condition.signal end |
#wait_for_message(timeout: nil) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/rsmp/collect/queue.rb', line 21 def (timeout: nil) if .empty? if timeout @task.with_timeout(timeout) { @condition.wait } else @condition.wait end end .shift rescue Async::TimeoutError raise RSMP::TimeoutError end |