Class: Celluloid::ZMQ::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/celluloid/zmq/reactor.rb

Overview

React to incoming 0MQ and Celluloid events. This is kinda sorta supposed to resemble the Reactor design pattern.

Instance Method Summary collapse

Constructor Details

#initialize(waker) ⇒ Reactor

Returns a new instance of Reactor.



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/celluloid/zmq/reactor.rb', line 6

def initialize(waker)
  @waker = waker
  @poller = ::ZMQ::Poller.new
  @readers = {}
  @writers = {}

  rc = @poller.register @waker.socket, ::ZMQ::POLLIN
  unless ::ZMQ::Util.resultcode_ok? rc
    raise "0MQ poll error: #{::ZMQ::Util.error_string}"
  end
end

Instance Method Details

#monitor_zmq(socket, set, type) ⇒ Object

Monitor the given ZMQ socket with the given options



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/celluloid/zmq/reactor.rb', line 29

def monitor_zmq(socket, set, type)
  if set.has_key? socket
    raise ArgumentError, "another method is already waiting on #{socket.inspect}"
  else
    set[socket] = Fiber.current
  end

  @poller.register socket, type
  Fiber.yield

  @poller.deregister socket, type
  socket
end

#run_once(timeout = nil) ⇒ Object

Run the reactor, waiting for events, and calling the given block if the reactor is awoken by the waker



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/celluloid/zmq/reactor.rb', line 45

def run_once(timeout = nil)
  timeout ||= :blocking
  rc = @poller.poll(timeout)

  unless ::ZMQ::Util.resultcode_ok? rc
    raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
  end

  @poller.readables.each do |sock|
    if sock == @waker.socket
      yield
    else
      fiber = @readers.delete sock
      fiber.resume if fiber
    end
  end

  @poller.writables.each do |sock|
    fiber = @writers.delete sock
    fiber.resume if fiber
  end
end

#wait_readable(socket) ⇒ Object

Wait for the given ZMQ socket to become readable



19
20
21
# File 'lib/celluloid/zmq/reactor.rb', line 19

def wait_readable(socket)
  monitor_zmq socket, @readers, ::ZMQ::POLLIN
end

#wait_writeable(socket) ⇒ Object

Wait for the given ZMQ socket to become writeable



24
25
26
# File 'lib/celluloid/zmq/reactor.rb', line 24

def wait_writeable(socket)
  monitor_zmq socket, @writers, ::ZMQ::POLLOUT
end