Class: Celluloid::ZMQ::Reactor
- Inherits:
-
Object
- Object
- Celluloid::ZMQ::Reactor
- Defined in:
- lib/celluloid/zmq/reactor.rb
Overview
React to incoming 0MQ and Celluloid events. This is kinda sorta supposed to resemble the Reactor design pattern.
Instance Method Summary collapse
-
#initialize(waker) ⇒ Reactor
constructor
A new instance of Reactor.
-
#monitor_zmq(socket, set, type) ⇒ Object
Monitor the given ZMQ socket with the given options.
-
#run_once(timeout = nil) ⇒ Object
Run the reactor, waiting for events, and calling the given block if the reactor is awoken by the waker.
-
#wait_readable(socket) ⇒ Object
Wait for the given ZMQ socket to become readable.
-
#wait_writeable(socket) ⇒ Object
Wait for the given ZMQ socket to become writeable.
Constructor Details
#initialize(waker) ⇒ Reactor
Returns a new instance of Reactor.
6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/celluloid/zmq/reactor.rb', line 6 def initialize(waker) @waker = waker @poller = ::ZMQ::Poller.new @readers = {} @writers = {} rc = @poller.register @waker.socket, ::ZMQ::POLLIN unless ::ZMQ::Util.resultcode_ok? rc raise "0MQ poll error: #{::ZMQ::Util.error_string}" end end |
Instance Method Details
#monitor_zmq(socket, set, type) ⇒ Object
Monitor the given ZMQ socket with the given options
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/celluloid/zmq/reactor.rb', line 29 def monitor_zmq(socket, set, type) if set.has_key? socket raise ArgumentError, "another method is already waiting on #{socket.inspect}" else set[socket] = Fiber.current end @poller.register socket, type Fiber.yield @poller.deregister socket, type socket end |
#run_once(timeout = nil) ⇒ Object
Run the reactor, waiting for events, and calling the given block if the reactor is awoken by the waker
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/celluloid/zmq/reactor.rb', line 45 def run_once(timeout = nil) timeout ||= :blocking rc = @poller.poll(timeout) unless ::ZMQ::Util.resultcode_ok? rc raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}" end @poller.readables.each do |sock| if sock == @waker.socket yield else fiber = @readers.delete sock fiber.resume if fiber end end @poller.writables.each do |sock| fiber = @writers.delete sock fiber.resume if fiber end end |
#wait_readable(socket) ⇒ Object
Wait for the given ZMQ socket to become readable
19 20 21 |
# File 'lib/celluloid/zmq/reactor.rb', line 19 def wait_readable(socket) monitor_zmq socket, @readers, ::ZMQ::POLLIN end |
#wait_writeable(socket) ⇒ Object
Wait for the given ZMQ socket to become writeable
24 25 26 |
# File 'lib/celluloid/zmq/reactor.rb', line 24 def wait_writeable(socket) monitor_zmq socket, @writers, ::ZMQ::POLLOUT end |