Class: ZMQMachine::Reactor
- Inherits:
-
Object
- Object
- ZMQMachine::Reactor
- Defined in:
- lib/zm/reactor.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#cancel_timer(timer) ⇒ Object
Cancels an existing timer if it hasn’t already fired.
-
#close_socket(sock) ⇒ Object
Removes the given
sock
socket from the reactor context. -
#deregister_readable(sock) ⇒ Object
Deregisters the
sock
for POLLIN events. -
#deregister_writable(sock) ⇒ Object
Deregisters the
sock
for POLLOUT. -
#initialize(name, poll_interval = 10, opts = {}) ⇒ Reactor
constructor
name
provides a name for this reactor instance. -
#join(delay = nil) ⇒ Object
Join on the thread running this reactor instance.
-
#kill ⇒ Object
Kills the running reactor instance by terminating its thread.
- #list_timers ⇒ Object
-
#log(level, message) ⇒ Object
Publishes log messages to an existing transport passed in to the Reactor constructor using the :log_transport key.
-
#next_tick(blk = nil, &block) ⇒ Object
Schedules a proc or block to execute on the next trip through the reactor loop.
-
#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire a single time.
- #open_socket_count(kind = :all) ⇒ Object
-
#pair_socket(handler_instance) ⇒ Object
Creates a PAIR socket and attaches
handler_instance
to the resulting socket. -
#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire every
delay
milliseconds until it is explicitly cancelled. -
#pub_socket(handler_instance) ⇒ Object
Creates a PUB socket and attaches
handler_instance
to the resulting socket. -
#register_readable(sock) ⇒ Object
Registers the
sock
for POLLIN events that will cause the reactor to call the handler’s on_readable method. -
#register_writable(sock) ⇒ Object
Registers the
sock
for POLLOUT events that will cause the reactor to call the handler’s on_writable method. -
#rep_socket(handler_instance) ⇒ Object
Creates a REP socket and attaches
handler_instance
to the resulting socket. -
#req_socket(handler_instance) ⇒ Object
Creates a REQ socket and attaches
handler_instance
to the resulting socket. -
#reschedule_timers ⇒ Object
Asks all timers to reschedule themselves starting from Timers.now.
-
#run(blk = nil, &block) ⇒ Object
The main entry point for all new reactor contexts.
-
#running? ⇒ Boolean
Returns true when the reactor is running OR while it is in the midst of a shutdown request.
-
#stop ⇒ Object
Marks the reactor as eligible for termination.
-
#sub_socket(handler_instance) ⇒ Object
Creates a SUB socket and attaches
handler_instance
to the resulting socket. -
#xrep_socket(handler_instance) ⇒ Object
Creates a XREP socket and attaches
handler_instance
to the resulting socket. -
#xreq_socket(handler_instance) ⇒ Object
Creates a XREQ socket and attaches
handler_instance
to the resulting socket.
Constructor Details
#initialize(name, poll_interval = 10, opts = {}) ⇒ Reactor
name
provides a name for this reactor instance. It’s unused at present but may be used in the future for allowing multiple reactors to communicate amongst each other.
poll_interval
is the number of milliseconds to block while waiting for new 0mq socket events; default is 10
opts
may contain a key :zeromq_context
. When this hash is provided, the value for :zeromq_context should be a 0mq context as created by ZMQ::Context.new. The purpose of providing a context to the reactor is so that multiple reactors can share a single context. Doing so allows for sockets within each reactor to communicate with each other via an :inproc transport (:inproc is misnamed, it should be :incontext). By not supplying this hash, the reactor will create and use its own 0mq context.
opts
may also include a :log_transport
key. This should be a transport string for an endpoint that a logger client may connect to for publishing log messages. when this key is defined, the client is automatically created and connected to the indicated endpoint.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/zm/reactor.rb', line 65 def initialize name, poll_interval = 10, opts = {} @name = name @running = false @thread = nil @poll_interval = determine_interval poll_interval @timers = ZMQMachine::Timers.new @proc_queue = [] @proc_queue_mutex = Mutex.new # could raise if it fails @context = opts[:zeromq_context] || ZMQ::Context.new @poller = ZMQ::Poller.new @sockets = [] @raw_to_socket = {} Thread.abort_on_exception = true if opts[:log_transport] @logger = LogClient.new self, opts[:log_transport] @logging_enabled = true end end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
40 41 42 |
# File 'lib/zm/reactor.rb', line 40 def name @name end |
Instance Method Details
#cancel_timer(timer) ⇒ Object
Cancels an existing timer if it hasn’t already fired.
Returns true if cancelled, false if otherwise.
365 366 367 |
# File 'lib/zm/reactor.rb', line 365 def cancel_timer timer @timers.cancel timer end |
#close_socket(sock) ⇒ Object
Removes the given sock
socket from the reactor context. It is deregistered for new events and closed. Any queued messages are silently dropped.
Returns true
for a succesful close, false
otherwise.
183 184 185 186 187 188 189 190 |
# File 'lib/zm/reactor.rb', line 183 def close_socket sock return false unless sock removed = delete_socket sock sock.raw_socket.close removed end |
#deregister_readable(sock) ⇒ Object
Deregisters the sock
for POLLIN events. The handler will no longer receive calls to on_readable.
334 335 336 |
# File 'lib/zm/reactor.rb', line 334 def deregister_readable sock @poller.deregister_readable sock.raw_socket end |
#deregister_writable(sock) ⇒ Object
Deregisters the sock
for POLLOUT. The handler will no longer receive calls to on_writable.
320 321 322 |
# File 'lib/zm/reactor.rb', line 320 def deregister_writable sock @poller.deregister_writable sock.raw_socket end |
#join(delay = nil) ⇒ Object
Join on the thread running this reactor instance. Default behavior is to wait indefinitely for the thread to exit.
Pass an optional delay
value measured in milliseconds; the thread will be stopped if it hasn’t exited by the end of delay
milliseconds.
Returns immediately when the thread has already exited.
139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/zm/reactor.rb', line 139 def join delay = nil # don't allow the thread to try and join itself and only worry about # joining for live threads if running? && @thread.alive? && @thread != Thread.current if delay # convert to seconds to meet the argument expectations of Thread#join seconds = delay / 1000.0 @thread.join seconds else @thread.join end end end |
#kill ⇒ Object
Kills the running reactor instance by terminating its thread.
After the thread exits, the reactor attempts to clean up after itself and kill any pending I/O.
158 159 160 161 162 163 164 |
# File 'lib/zm/reactor.rb', line 158 def kill if running? @stopping = true @thread.kill cleanup end end |
#list_timers ⇒ Object
378 379 380 381 382 383 |
# File 'lib/zm/reactor.rb', line 378 def list_timers @timers.list.each do |timer| name = timer.respond_to?(:name) ? timer.timer_proc.name : timer.timer_proc.to_s puts "fire time [#{Time.at(timer.fire_time / 1000)}], method [#{name}]" end end |
#log(level, message) ⇒ Object
Publishes log messages to an existing transport passed in to the Reactor constructor using the :log_transport key.
Reactor.new :log_transport => 'inproc://reactor_log'
level
parameter refers to a key to indicate severity level, e.g. :warn, :debug, level0, level9, etc.
message
is a plain string that will be written out in its entirety.
When no :log_transport was defined when creating the Reactor, all calls just discard the messages.
408 409 410 411 412 |
# File 'lib/zm/reactor.rb', line 408 def log level, if @logging_enabled @logger.write [ZMQ::Message.new(level.to_s), ZMQ::Message.new(.to_s)] end end |
#next_tick(blk = nil, &block) ⇒ Object
Schedules a proc or block to execute on the next trip through the reactor loop.
This method is thread-safe.
171 172 173 174 175 176 |
# File 'lib/zm/reactor.rb', line 171 def next_tick blk = nil, &block blk ||= block @proc_queue_mutex.synchronize do @proc_queue << blk end end |
#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire a single time. Expects either a timer_proc
proc or a block, otherwise no timer is created.
delay
is measured in milliseconds (1 second equals 1000 milliseconds)
344 345 346 347 |
# File 'lib/zm/reactor.rb', line 344 def oneshot_timer delay, timer_proc = nil, &blk blk ||= timer_proc @timers.add_oneshot delay, blk end |
#open_socket_count(kind = :all) ⇒ Object
385 386 387 388 389 390 391 392 393 |
# File 'lib/zm/reactor.rb', line 385 def open_socket_count kind = :all @sockets.inject(0) do |sum, socket| if :all == kind || (socket.kind == kind) sum + 1 else sum end end end |
#pair_socket(handler_instance) ⇒ Object
Creates a PAIR socket and attaches handler_instance
to the resulting socket. Works only with other #pair_socket instances in the same or other reactor instance.
handler_instance
must implement the #on_readable and #on_readable_error methods. Each handler must also implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
270 271 272 273 274 |
# File 'lib/zm/reactor.rb', line 270 def pair_socket handler_instance sock = ZMQMachine::Socket::Pair.new @context, handler_instance save_socket sock sock end |
#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object
Creates a timer that will fire every delay
milliseconds until it is explicitly cancelled. Expects either a timer_proc
proc or a block, otherwise no timer is created.
delay
is measured in milliseconds (1 second equals 1000 milliseconds)
356 357 358 359 |
# File 'lib/zm/reactor.rb', line 356 def periodical_timer delay, timer_proc = nil, &blk blk ||= timer_proc @timers.add_periodical delay, blk end |
#pub_socket(handler_instance) ⇒ Object
Creates a PUB socket and attaches handler_instance
to the resulting socket. Usually paired with one or more #sub_socket instances in the same or other reactor instance.
handler_instance
must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never receive/read messages.
All handlers must implement the #on_attach method.
287 288 289 290 291 |
# File 'lib/zm/reactor.rb', line 287 def pub_socket handler_instance sock = ZMQMachine::Socket::Pub.new @context, handler_instance save_socket sock sock end |
#register_readable(sock) ⇒ Object
Registers the sock
for POLLIN events that will cause the reactor to call the handler’s on_readable method.
327 328 329 |
# File 'lib/zm/reactor.rb', line 327 def register_readable sock @poller.register_readable sock.raw_socket end |
#register_writable(sock) ⇒ Object
Registers the sock
for POLLOUT events that will cause the reactor to call the handler’s on_writable method.
313 314 315 |
# File 'lib/zm/reactor.rb', line 313 def register_writable sock @poller.register_writable sock.raw_socket end |
#rep_socket(handler_instance) ⇒ Object
Creates a REP socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #req_socket instance.
handler_instance
must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
218 219 220 221 222 |
# File 'lib/zm/reactor.rb', line 218 def rep_socket handler_instance sock = ZMQMachine::Socket::Rep.new @context, handler_instance save_socket sock sock end |
#req_socket(handler_instance) ⇒ Object
Creates a REQ socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #rep_socket instance.
handler_instance
must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
202 203 204 205 206 |
# File 'lib/zm/reactor.rb', line 202 def req_socket handler_instance sock = ZMQMachine::Socket::Req.new @context, handler_instance save_socket sock sock end |
#reschedule_timers ⇒ Object
Asks all timers to reschedule themselves starting from Timers.now. Typically called when the underlying time source for the ZM::Timers class has been replaced; existing timers may not fire as expected, so we ask them to reset themselves.
374 375 376 |
# File 'lib/zm/reactor.rb', line 374 def reschedule_timers @timers.reschedule end |
#run(blk = nil, &block) ⇒ Object
The main entry point for all new reactor contexts. This proc or block given to this method is evaluated once before entering the reactor loop. This evaluation generally sets up sockets and timers that will do the real work once the loop is executed.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/zm/reactor.rb', line 101 def run blk = nil, &block blk ||= block @running, @stopping = true, false @thread = Thread.new do blk.call self if blk while !@stopping && running? do run_once end cleanup end self end |
#running? ⇒ Boolean
Returns true when the reactor is running OR while it is in the midst of a shutdown request.
Returns false when the reactor thread does not exist.
93 |
# File 'lib/zm/reactor.rb', line 93 def running?() @running; end |
#stop ⇒ Object
Marks the reactor as eligible for termination. Then waits for the reactor thread to exit via #join (no timeout).
The reactor is not forcibly terminated if it is currently blocked by some long-running operation. Use #kill to forcibly terminate the reactor.
124 125 126 127 128 |
# File 'lib/zm/reactor.rb', line 124 def stop # wait until the thread loops around again and exits on its own @stopping = true join end |
#sub_socket(handler_instance) ⇒ Object
Creates a SUB socket and attaches handler_instance
to the resulting socket. Usually paired with one or more
#pub_socket in the same or different reactor context.
handler_instance
must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.
All handlers must implement the #on_attach method.
304 305 306 307 308 |
# File 'lib/zm/reactor.rb', line 304 def sub_socket handler_instance sock = ZMQMachine::Socket::Sub.new @context, handler_instance save_socket sock sock end |
#xrep_socket(handler_instance) ⇒ Object
Creates a XREP socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #req_socket instance.
handler_instance
must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
252 253 254 255 256 |
# File 'lib/zm/reactor.rb', line 252 def xrep_socket handler_instance sock = ZMQMachine::Socket::XRep.new @context, handler_instance save_socket sock sock end |
#xreq_socket(handler_instance) ⇒ Object
Creates a XREQ socket and attaches handler_instance
to the resulting socket. Should only be paired with one other #rep_socket instance.
handler_instance
must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.
All handlers must implement the #on_attach method.
235 236 237 238 239 |
# File 'lib/zm/reactor.rb', line 235 def xreq_socket handler_instance sock = ZMQMachine::Socket::XReq.new @context, handler_instance save_socket sock sock end |