Class: Celluloid::ZMQ::Reactor

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/celluloid/zmq/reactor.rb

Overview

React to incoming 0MQ and Celluloid events. This is kinda sorta supposed to resemble the Reactor design pattern.

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



11
12
13
14
15
16
17
18
19
# File 'lib/celluloid/zmq/reactor.rb', line 11

def initialize
  @waker = Waker.new
  @poller = ::ZMQ::Poller.new
  @readers = {}
  @writers = {}

  rc = @poller.register @waker.socket, ::ZMQ::POLLIN
  fail "0MQ poll error: #{::ZMQ::Util.error_string}" unless result_ok? rc
end

Instance Method Details

#monitor_zmq(socket, set, type) ⇒ Object

Monitor the given ZMQ socket with the given options



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/celluloid/zmq/reactor.rb', line 32

def monitor_zmq(socket, set, type)
  if set.key? socket
    fail ArgumentError, "another method is already waiting on #{socket.inspect}"
  else
    set[socket] = Task.current
  end

  @poller.register socket, type

  Task.suspend :zmqwait
  socket
end

#run_once(timeout = nil) ⇒ Object

Run the reactor, waiting for events, and calling the given block if the reactor is awoken by the waker



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/celluloid/zmq/reactor.rb', line 47

def run_once(timeout = nil)
  if timeout
    timeout *= 1000 # Poller uses millisecond increments
  else
    timeout = :blocking
  end

  rc = @poller.poll(timeout)

  unless result_ok? rc
    fail IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
  end

  @poller.readables.each do |sock|
    if sock == @waker.socket
      @waker.wait
    else
      task = @readers.delete sock
      @poller.deregister sock, ::ZMQ::POLLIN

      if task
        task.resume
      else
        Celluloid::Logger.debug "ZMQ error: got read event without associated reader"
      end
    end
  end

  @poller.writables.each do |sock|
    task = @writers.delete sock
    @poller.deregister sock, ::ZMQ::POLLOUT

    if task
      task.resume
    else
      Celluloid::Logger.debug "ZMQ error: got write event without associated reader"
    end
  end
end

#wait_readable(socket) ⇒ Object

Wait for the given ZMQ socket to become readable



22
23
24
# File 'lib/celluloid/zmq/reactor.rb', line 22

def wait_readable(socket)
  monitor_zmq socket, @readers, ::ZMQ::POLLIN
end

#wait_writable(socket) ⇒ Object

Wait for the given ZMQ socket to become writable



27
28
29
# File 'lib/celluloid/zmq/reactor.rb', line 27

def wait_writable(socket)
  monitor_zmq socket, @writers, ::ZMQ::POLLOUT
end