Class: Celluloid::ZMQ::Reactor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/celluloid/zmq/reactor.rb

Overview

React to incoming 0MQ and Celluloid events. This is kinda sorta supposed to resemble the Reactor design pattern.

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/celluloid/zmq/reactor.rb', line 12

def initialize
  @waker = Waker.new
  @poller = ::ZMQ::Poller.new
  @readers = {}
  @writers = {}

  rc = @poller.register @waker.socket, ::ZMQ::POLLIN
  unless result_ok? rc
    raise "0MQ poll error: #{::ZMQ::Util.error_string}"
  end
end

Instance Method Details

#monitor_zmq(socket, set, type) ⇒ Object

Monitor the given ZMQ socket with the given options



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/celluloid/zmq/reactor.rb', line 35

def monitor_zmq(socket, set, type)
  if set.has_key? socket
    raise ArgumentError, "another method is already waiting on #{socket.inspect}"
  else
    set[socket] = Task.current
  end

  @poller.register socket, type

  Task.suspend :zmqwait
  socket
end

#run_once(timeout = nil) ⇒ Object

Run the reactor, waiting for events, and calling the given block if the reactor is awoken by the waker



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/celluloid/zmq/reactor.rb', line 50

def run_once(timeout = nil)
  if timeout
    timeout *= 1000 # Poller uses millisecond increments
  else
    timeout = :blocking
  end

  rc = @poller.poll(timeout)

  unless result_ok? rc
    raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
  end

  @poller.readables.each do |sock|
    if sock == @waker.socket
      @waker.wait
    else
      task = @readers.delete sock
      @poller.deregister sock, ::ZMQ::POLLIN

      if task
        task.resume
      else
        Celluloid::Logger.debug "ZMQ error: got read event without associated reader"
      end
    end
  end

  @poller.writables.each do |sock|
    task = @writers.delete sock
    @poller.deregister sock, ::ZMQ::POLLOUT

    if task
      task.resume
    else
      Celluloid::Logger.debug "ZMQ error: got write event without associated reader"
    end
  end
end

#wait_readable(socket) ⇒ Object

Wait for the given ZMQ socket to become readable



25
26
27
# File 'lib/celluloid/zmq/reactor.rb', line 25

def wait_readable(socket)
  monitor_zmq socket, @readers, ::ZMQ::POLLIN
end

#wait_writable(socket) ⇒ Object

Wait for the given ZMQ socket to become writable



30
31
32
# File 'lib/celluloid/zmq/reactor.rb', line 30

def wait_writable(socket)
  monitor_zmq socket, @writers, ::ZMQ::POLLOUT
end