Class: CZTop::Actor

Inherits:
Object
  • Object
show all
Extended by:
HasFFIDelegate::ClassMethods
Includes:
CZMQ::FFI, HasFFIDelegate, PolymorphicZsockMethods, SendReceiveMethods, ZsockOptions
Defined in:
lib/cztop/actor.rb

Overview

Represents a CZMQ::FFI::Zactor.

About Thread-Safety

The instance methods of this class are thread-safe. So it's safe to call #<<, #request or even #terminate from different threads. Caution: Use only these methods to communicate with the low-level zactor. Don't use Message#send_to directly to send itself to an Actor instance, as it wouldn't be thread-safe.

About termination

Actors should be terminated explicitly, either by calling #terminate from the current process or sending them the “$TERM” command (from outside). Not terminating them explicitly might make the process block at exit.

Examples:

Simple Actor with Ruby block

result = ""
a = CZTop::Actor.new do |msg, pipe|
  case msg[0]
  when "foo"
    pipe << "bar"
  when "append"
    result << msg[1].to_s
  when "result"
    pipe << result
  end
end
a.request("foo")[0] #=> "bar"
a.request("foo")[0] #=> "bar"
a << ["append", "baz"] << ["append", "baz"]
a.request("result")[0] #=> "bazbaz"

See Also:

Defined Under Namespace

Classes: DeadActorError

Constant Summary collapse

TERM =

the command which causes an actor handler to terminate

"$TERM"

Instance Attribute Summary collapse

Attributes included from HasFFIDelegate

#ffi_delegate

Instance Method Summary collapse

Methods included from HasFFIDelegate::ClassMethods

ffi_delegate, from_ffi_delegate

Methods included from PolymorphicZsockMethods

#set_unbounded, #signal

Methods included from ZsockOptions

#fd, #options, #readable?, #writable?

Methods included from HasFFIDelegate

#attach_ffi_delegate, #from_ffi_delegate, #raise_zmq_err, raise_zmq_err, #to_ptr

Constructor Details

#initialize(callback = nil, c_args = nil) {|message, pipe| ... } ⇒ Actor

Creates a new actor. Either pass a callback directly or a block. The block will be called for every received message.

In case the given callback is an FFI::Pointer (to a C function), it's used as-is. It is expected to do the handshake (signal) itself.

Parameters:

  • callback (FFI::Pointer, Proc, #call) (defaults to: nil)

    pointer to a C function or just anything callable

  • c_args (FFI::Pointer, nil) (defaults to: nil)

    args, only useful if callback is an FFI::Pointer

Yield Parameters:

See Also:


62
63
64
65
66
67
68
69
70
# File 'lib/cztop/actor.rb', line 62

def initialize(callback = nil, c_args = nil, &handler)
  @running = true
  @mtx = Mutex.new
  @callback = callback || handler
  @callback = shim(@callback) unless @callback.is_a? ::FFI::Pointer
  ffi_delegate = Zactor.new(@callback, c_args)
  attach_ffi_delegate(ffi_delegate)
  options.sndtimeo = 20#ms # see #<<
end

Instance Attribute Details

#exceptionException (readonly)

Returns the exception that crashed this actor, if any.

Returns:

  • (Exception)

    the exception that crashed this actor, if any


47
48
49
# File 'lib/cztop/actor.rb', line 47

def exception
  @exception
end

Instance Method Details

#<<(message) ⇒ self

Note:

Normally this method is asynchronous, but if the message is “$TERM”, it blocks until the actor is terminated.

Send a message to the actor.

Parameters:

  • message (Object)

    message to send to the actor, see Message.coerce

Returns:

  • (self)

    so it's chainable

Raises:


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/cztop/actor.rb', line 80

def <<(message)
  message = Message.coerce(message)

  if TERM == message[0]
    # NOTE: can't just send this to the actor. The sender might call
    # #terminate immediately, which most likely causes a hang due to race
    # conditions.
    terminate
  else
    begin
      @mtx.synchronize do
        raise DeadActorError if not @running
        message.send_to(self)
      end
    rescue IO::EAGAINWaitWritable
      # The sndtimeo has been reached.
      #
      # This should fix the race condition (mainly on JRuby) between
      # @running not being set to false yet but the actor handler already
      # terminating and thus not able to receive messages anymore.
      #
      # This shouldn't result in an infinite loop, since it'll stop as
      # soon as @running is set to false by #signal_shimmed_handler_death,
      # at least when using a Ruby handler.
      #
      # In case of a C function handler, it MUST NOT crash and only
      # terminate when being sent the "$TERM" message using #terminate (so
      # #await_handler_death can set
      # @running to false).
      retry
    end
  end
  self
end

#await_handler_deathvoid (private)

This method returns an undefined value.

Waits for the C or Ruby handler to die.


291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/cztop/actor.rb', line 291

def await_handler_death
  if handler_shimmed?
    # for Ruby block/Proc object handlers
    @handler_dead_signal.pop

  else
    # for handlers that are passed as C functions, we rely on normal death
    # signal

    # can't use #wait here because of recursive deadlock
    Zsock.wait(ffi_delegate)

    @running = false
  end
end

#crashed?Boolean

Returns whether this actor has crashed.

Returns:

  • (Boolean)

    whether this actor has crashed

See Also:


189
190
191
# File 'lib/cztop/actor.rb', line 189

def crashed?
  !!@exception # if set, it has crashed
end

#dead?Boolean

Returns whether this actor is dead (terminated or crashed).

Returns:

  • (Boolean)

    whether this actor is dead (terminated or crashed)


183
184
185
# File 'lib/cztop/actor.rb', line 183

def dead?
  !@running
end

#handler_shimmed?Boolean (private)

Returns whether the handler is a Ruby object, like a simple block (as opposed to a FFI::Pointer to a C function).

Returns:

  • (Boolean)

    whether the handler is a Ruby object, like a simple block (as opposed to a FFI::Pointer to a C function)


226
227
228
# File 'lib/cztop/actor.rb', line 226

def handler_shimmed?
  !!@handler_thread # if it exists, it's shimmed
end

#next_messageMessage (private)

Receives the next message even across any interrupts.

Returns:


261
262
263
# File 'lib/cztop/actor.rb', line 261

def next_message
  @pipe.receive
end

#process_messages(handler) {|message, pipe| ... } ⇒ Object (private)

Successively receive messages that were sent to the actor and yield them to the given handler to process them. The a pipe (a Socket::PAIR socket) is also passed to the handler so it can send back the result of a command, if needed.

When a message is “$TERM”, or when the waiting for a message is interrupted, execution is aborted and the actor will terminate.

Parameters:

  • handler (Proc, #call)

    the handler used to process messages

Yield Parameters:

  • message (Message)

    message (e.g. command) received

  • pipe (Socket::PAIR)

    pipe to write back something into the actor


245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/cztop/actor.rb', line 245

def process_messages(handler)
  while true
    begin
      message = next_message
    rescue Interrupt
      break
    else
      break if TERM == message[0]
    end

    handler.call(message, @pipe)
  end
end

#receiveMessage

Receive a message from the actor.

Returns:

Raises:


118
119
120
121
122
123
# File 'lib/cztop/actor.rb', line 118

def receive
  @mtx.synchronize do
    raise DeadActorError if not @running
    super
  end
end

#request(message) ⇒ Message

Same as #<<, but also waits for a response from the actor and returns it.

Parameters:

  • message (Message)

    the request to the actor

Returns:

  • (Message)

    the actor's response

Raises:

  • (ArgumentError)

    if the message is “$TERM” (use #terminate)


130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/cztop/actor.rb', line 130

def request(message)
  @mtx.synchronize do
    raise DeadActorError if not @running
    message = Message.coerce(message)
    raise ArgumentError, "use #terminate" if TERM == message[0]
    message.send_to(self)
    Message.receive_from(self)
  end
rescue IO::EAGAINWaitWritable
  # same as in #<<
  retry
end

#send_picture(picture, *args) ⇒ void

Note:

Mainly added for Beacon. If implemented there, it wouldn't be thread safe. And it's not that useful to be added to SendReceiveMethods.

This method returns an undefined value.

Sends a message according to a “picture”.

Parameters:

  • picture (String)

    message's part types

  • args (String, Integer, ...)

    values, in FFI style (each one preceeded with it's type, like :string, "foo")

See Also:

  • on http://api.zeromq.org/czmq3-0:zsock

152
153
154
155
156
157
# File 'lib/cztop/actor.rb', line 152

def send_picture(picture, *args)
  @mtx.synchronize do
    raise DeadActorError if not @running
    Zsock.send(ffi_delegate, picture, *args)
  end
end

#shim(handler) ⇒ FFI::Function (private)

Shims the given handler. The shim is used to do the handshake, to #process_messages, and ensure we're notified when the handler has terminated.

Parameters:

  • handler (Proc, #call)

    the handler used to process messages

Returns:

  • (FFI::Function)

    the callback function to be passed to the zactor

Raises:

  • (ArgumentError)

    if invalid handler given


202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/cztop/actor.rb', line 202

def shim(handler)
  raise ArgumentError, "invalid handler" if !handler.respond_to?(:call)

  @handler_thread = nil
  @handler_dead_signal = Queue.new # used for signaling

  Zactor.fn do |pipe_delegate, _args|
    begin
      @mtx.synchronize do
        @handler_thread = Thread.current
        @pipe = Socket::PAIR.from_ffi_delegate(pipe_delegate)
        @pipe.signal # handshake, so zactor_new() returns
      end
      process_messages(handler)
    rescue Exception
      @exception = $!
    ensure
      signal_shimmed_handler_death
    end
  end
end

#signal_shimmed_handler_deathvoid (private)

This method returns an undefined value.

Creates a new thread that will signal the definitive termination of the Ruby handler.

This is needed to avoid the race condition between zactor_destroy() which will wait for a signal from the handler in case it was able to send the “$TERM” command, and the @callback which might still haven't returned, but doesn't receive any messages anymore.


274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/cztop/actor.rb', line 274

def signal_shimmed_handler_death
  # NOTE: can't just use ConditionVariable, as the signaling code might be
  # run BEFORE the waiting code.

  Thread.new do
    @handler_thread.join

    # NOTE: we do this here and not in #terminate, so it also works when
    # actor isn't terminated using #terminate
    @running = false

    @handler_dead_signal.push(nil)
  end
end

#terminateBoolean

Tells the actor to terminate and waits for it. Idempotent.

Returns:

  • (Boolean)

    whether it died just now (false if it was dead already)


170
171
172
173
174
175
176
177
178
179
180
# File 'lib/cztop/actor.rb', line 170

def terminate
  @mtx.synchronize do
    return false if not @running
    Message.new(TERM).send_to(self)
    await_handler_death
    true
  end
rescue IO::EAGAINWaitWritable
  # same as in #<<
  retry
end

#waitInteger

Returns:

  • (Integer)

161
162
163
164
165
# File 'lib/cztop/actor.rb', line 161

def wait
  @mtx.synchronize do
    super
  end
end