Class: ZMQMachine::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/zm/reactor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, poll_interval = 10, opts = {}) ⇒ Reactor

name provides a name for this reactor instance. It’s unused at present but may be used in the future for allowing multiple reactors to communicate amongst each other.

poll_interval is the number of milliseconds to block while waiting for new 0mq socket events; default is 10

opts may contain a key :zeromq_context. When this hash is provided, the value for :zeromq_context should be a 0mq context as created by ZMQ::Context.new. The purpose of providing a context to the reactor is so that multiple reactors can share a single context. Doing so allows for sockets within each reactor to communicate with each other via an :inproc transport (:inproc is misnamed, it should be :incontext). By not supplying this hash, the reactor will create and use its own 0mq context.

opts may also include a :log_transport key. This should be a transport string for an endpoint that a logger client may connect to for publishing log messages. when this key is defined, the client is automatically created and connected to the indicated endpoint.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/zm/reactor.rb', line 65

def initialize name, poll_interval = 10, opts = {}
  @name = name
  @running = false
  @thread = nil
  @poll_interval = determine_interval poll_interval
  @timers = ZMQMachine::Timers.new

  @proc_queue = []
  @proc_queue_mutex = Mutex.new

  # could raise if it fails
  @context = opts[:zeromq_context] || ZMQ::Context.new
  @poller = ZMQ::Poller.new
  @sockets = []
  @raw_to_socket = {}
  Thread.abort_on_exception = true

  if opts[:log_transport]
    @logger = LogClient.new self, opts[:log_transport]
    @logging_enabled = true
  end
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



40
41
42
# File 'lib/zm/reactor.rb', line 40

def name
  @name
end

Instance Method Details

#cancel_timer(timer) ⇒ Object

Cancels an existing timer if it hasn’t already fired.

Returns true if cancelled, false if otherwise.



365
366
367
# File 'lib/zm/reactor.rb', line 365

def cancel_timer timer
  @timers.cancel timer
end

#close_socket(sock) ⇒ Object

Removes the given sock socket from the reactor context. It is deregistered for new events and closed. Any queued messages are silently dropped.

Returns true for a succesful close, false otherwise.



183
184
185
186
187
188
189
190
# File 'lib/zm/reactor.rb', line 183

def close_socket sock
  return false unless sock

  removed = delete_socket sock
  sock.raw_socket.close

  removed
end

#deregister_readable(sock) ⇒ Object

Deregisters the sock for POLLIN events. The handler will no longer receive calls to on_readable.



334
335
336
# File 'lib/zm/reactor.rb', line 334

def deregister_readable sock
  @poller.deregister_readable sock.raw_socket
end

#deregister_writable(sock) ⇒ Object

Deregisters the sock for POLLOUT. The handler will no longer receive calls to on_writable.



320
321
322
# File 'lib/zm/reactor.rb', line 320

def deregister_writable sock
  @poller.deregister_writable sock.raw_socket
end

#join(delay = nil) ⇒ Object

Join on the thread running this reactor instance. Default behavior is to wait indefinitely for the thread to exit.

Pass an optional delay value measured in milliseconds; the thread will be stopped if it hasn’t exited by the end of delay milliseconds.

Returns immediately when the thread has already exited.



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/zm/reactor.rb', line 139

def join delay = nil
  # don't allow the thread to try and join itself and only worry about
  # joining for live threads
  if running? && @thread.alive? && @thread != Thread.current
    if delay
      # convert to seconds to meet the argument expectations of Thread#join
      seconds = delay / 1000.0
      @thread.join seconds
    else
      @thread.join
    end
  end
end

#killObject

Kills the running reactor instance by terminating its thread.

After the thread exits, the reactor attempts to clean up after itself and kill any pending I/O.



158
159
160
161
162
163
164
# File 'lib/zm/reactor.rb', line 158

def kill
  if running?
    @stopping = true
    @thread.kill
    cleanup
  end
end

#list_timersObject



378
379
380
381
382
383
# File 'lib/zm/reactor.rb', line 378

def list_timers
  @timers.list.each do |timer|
    name = timer.respond_to?(:name) ? timer.timer_proc.name : timer.timer_proc.to_s
    puts "fire time [#{Time.at(timer.fire_time / 1000)}], method [#{name}]"
  end
end

#log(level, message) ⇒ Object

Publishes log messages to an existing transport passed in to the Reactor constructor using the :log_transport key.

Reactor.new :log_transport => 'inproc://reactor_log'

level parameter refers to a key to indicate severity level, e.g. :warn, :debug, level0, level9, etc.

message is a plain string that will be written out in its entirety.

When no :log_transport was defined when creating the Reactor, all calls just discard the messages.



408
409
410
411
412
# File 'lib/zm/reactor.rb', line 408

def log level, message
  if @logging_enabled
    @logger.write [ZMQ::Message.new(level.to_s), ZMQ::Message.new(message.to_s)]
  end
end

#next_tick(blk = nil, &block) ⇒ Object

Schedules a proc or block to execute on the next trip through the reactor loop.

This method is thread-safe.



171
172
173
174
175
176
# File 'lib/zm/reactor.rb', line 171

def next_tick blk = nil, &block
  blk ||= block
  @proc_queue_mutex.synchronize do
    @proc_queue << blk
  end
end

#oneshot_timer(delay, timer_proc = nil, &blk) ⇒ Object

Creates a timer that will fire a single time. Expects either a timer_proc proc or a block, otherwise no timer is created.

delay is measured in milliseconds (1 second equals 1000 milliseconds)



344
345
346
347
# File 'lib/zm/reactor.rb', line 344

def oneshot_timer delay, timer_proc = nil, &blk
  blk ||= timer_proc
  @timers.add_oneshot delay, blk
end

#open_socket_count(kind = :all) ⇒ Object



385
386
387
388
389
390
391
392
393
# File 'lib/zm/reactor.rb', line 385

def open_socket_count kind = :all
  @sockets.inject(0) do |sum, socket|
    if :all == kind || (socket.kind == kind)
      sum + 1
    else
      sum
    end
  end
end

#pair_socket(handler_instance) ⇒ Object

Creates a PAIR socket and attaches handler_instance to the resulting socket. Works only with other #pair_socket instances in the same or other reactor instance.

handler_instance must implement the #on_readable and #on_readable_error methods. Each handler must also implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



270
271
272
273
274
# File 'lib/zm/reactor.rb', line 270

def pair_socket handler_instance
  sock = ZMQMachine::Socket::Pair.new @context, handler_instance
  save_socket sock
  sock
end

#periodical_timer(delay, timer_proc = nil, &blk) ⇒ Object

Creates a timer that will fire every delay milliseconds until it is explicitly cancelled. Expects either a timer_proc proc or a block, otherwise no timer is created.

delay is measured in milliseconds (1 second equals 1000 milliseconds)



356
357
358
359
# File 'lib/zm/reactor.rb', line 356

def periodical_timer delay, timer_proc = nil, &blk
  blk ||= timer_proc
  @timers.add_periodical delay, blk
end

#pub_socket(handler_instance) ⇒ Object

Creates a PUB socket and attaches handler_instance to the resulting socket. Usually paired with one or more #sub_socket instances in the same or other reactor instance.

handler_instance must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events. This socket type can only write; it can never receive/read messages.

All handlers must implement the #on_attach method.



287
288
289
290
291
# File 'lib/zm/reactor.rb', line 287

def pub_socket handler_instance
  sock = ZMQMachine::Socket::Pub.new @context, handler_instance
  save_socket sock
  sock
end

#register_readable(sock) ⇒ Object

Registers the sock for POLLIN events that will cause the reactor to call the handler’s on_readable method.



327
328
329
# File 'lib/zm/reactor.rb', line 327

def register_readable sock
  @poller.register_readable sock.raw_socket
end

#register_writable(sock) ⇒ Object

Registers the sock for POLLOUT events that will cause the reactor to call the handler’s on_writable method.



313
314
315
# File 'lib/zm/reactor.rb', line 313

def register_writable sock
  @poller.register_writable sock.raw_socket
end

#rep_socket(handler_instance) ⇒ Object

Creates a REP socket and attaches handler_instance to the resulting socket. Should only be paired with one other #req_socket instance.

handler_instance must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



218
219
220
221
222
# File 'lib/zm/reactor.rb', line 218

def rep_socket handler_instance
  sock = ZMQMachine::Socket::Rep.new @context, handler_instance
  save_socket sock
  sock
end

#req_socket(handler_instance) ⇒ Object

Creates a REQ socket and attaches handler_instance to the resulting socket. Should only be paired with one other #rep_socket instance.

handler_instance must implement the #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



202
203
204
205
206
# File 'lib/zm/reactor.rb', line 202

def req_socket handler_instance
  sock = ZMQMachine::Socket::Req.new @context, handler_instance
  save_socket sock
  sock
end

#reschedule_timersObject

Asks all timers to reschedule themselves starting from Timers.now. Typically called when the underlying time source for the ZM::Timers class has been replaced; existing timers may not fire as expected, so we ask them to reset themselves.



374
375
376
# File 'lib/zm/reactor.rb', line 374

def reschedule_timers
  @timers.reschedule
end

#run(blk = nil, &block) ⇒ Object

The main entry point for all new reactor contexts. This proc or block given to this method is evaluated once before entering the reactor loop. This evaluation generally sets up sockets and timers that will do the real work once the loop is executed.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/zm/reactor.rb', line 101

def run blk = nil, &block
  blk ||= block
  @running, @stopping = true, false

  @thread = Thread.new do
    blk.call self if blk

    while !@stopping && running? do
      run_once
    end

    cleanup
  end
  self
end

#running?Boolean

Returns true when the reactor is running OR while it is in the midst of a shutdown request.

Returns false when the reactor thread does not exist.

Returns:

  • (Boolean)


93
# File 'lib/zm/reactor.rb', line 93

def running?() @running; end

#stopObject

Marks the reactor as eligible for termination. Then waits for the reactor thread to exit via #join (no timeout).

The reactor is not forcibly terminated if it is currently blocked by some long-running operation. Use #kill to forcibly terminate the reactor.



124
125
126
127
128
# File 'lib/zm/reactor.rb', line 124

def stop
  # wait until the thread loops around again and exits on its own
  @stopping = true
  join
end

#sub_socket(handler_instance) ⇒ Object

Creates a SUB socket and attaches handler_instance to the resulting socket. Usually paired with one or more

#pub_socket in the same or different reactor context.

handler_instance must implement the #on_readable and #on_readable_error methods. The reactor will call those methods based upon new events. This socket type can only read; it can never write/send messages.

All handlers must implement the #on_attach method.



304
305
306
307
308
# File 'lib/zm/reactor.rb', line 304

def sub_socket handler_instance
  sock = ZMQMachine::Socket::Sub.new @context, handler_instance
  save_socket sock
  sock
end

#xrep_socket(handler_instance) ⇒ Object

Creates a XREP socket and attaches handler_instance to the resulting socket. Should only be paired with one other #req_socket instance.

handler_instance must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



252
253
254
255
256
# File 'lib/zm/reactor.rb', line 252

def xrep_socket handler_instance
  sock = ZMQMachine::Socket::XRep.new @context, handler_instance
  save_socket sock
  sock
end

#xreq_socket(handler_instance) ⇒ Object

Creates a XREQ socket and attaches handler_instance to the resulting socket. Should only be paired with one other #rep_socket instance.

handler_instance must implement the #on_readable, #on_readable_error, #on_writable and #on_writable_error methods. The reactor will call those methods based upon new events.

All handlers must implement the #on_attach method.



235
236
237
238
239
# File 'lib/zm/reactor.rb', line 235

def xreq_socket handler_instance
  sock = ZMQMachine::Socket::XReq.new @context, handler_instance
  save_socket sock
  sock
end