Class: CZTop::Poller::ZPoller

Inherits:
Object
  • Object
show all
Extended by:
HasFFIDelegate::ClassMethods
Includes:
CZMQ::FFI, HasFFIDelegate
Defined in:
lib/cztop/poller/zpoller.rb

Overview

This is the trivial poller based on zpoller. It only supports polling for readability, but it also supports doing that on CLIENT/SERVER sockets, which is useful for CZTop::Poller.

Instance Attribute Summary

Attributes included from HasFFIDelegate

#ffi_delegate

Instance Method Summary collapse

Methods included from HasFFIDelegate::ClassMethods

ffi_delegate, from_ffi_delegate

Methods included from HasFFIDelegate

#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #raise_zmq_err, #to_ptr

Constructor Details

#initialize(reader, *readers) ⇒ ZPoller

Initializes the Poller. At least one reader has to be given.

Parameters:

  • reader (Socket, Actor)

    socket to poll for input

  • readers (Socket, Actor)

    any additional sockets to poll for input



15
16
17
18
19
20
21
22
23
# File 'lib/cztop/poller/zpoller.rb', line 15

def initialize(reader, *readers)
  @sockets = {} # to keep references and return same instances
  ptr = Zpoller.new(reader,
                    *readers.flat_map {|r| [ :pointer, r ] },
                    :pointer, nil)
  attach_ffi_delegate(ptr)
  remember_socket(reader)
  readers.each { |r| remember_socket(r) }
end

Instance Method Details

#add(reader) ⇒ void

This method returns an undefined value.

Adds another reader socket to the poller.

Parameters:

Raises:

  • (SystemCallError)

    if this fails



29
30
31
32
33
# File 'lib/cztop/poller/zpoller.rb', line 29

def add(reader)
  rc = ffi_delegate.add(reader)
  raise_zmq_err("unable to add socket %p" % reader) if rc == -1
  remember_socket(reader)
end

#forget_socket(socket) ⇒ void (private)

This method returns an undefined value.

Forgets the socket because it has been removed from the poller.

Parameters:

  • socket (Socket, Actor)

    the socket instance to forget



97
98
99
# File 'lib/cztop/poller/zpoller.rb', line 97

def forget_socket(socket)
  @sockets.delete(socket.to_ptr.to_i)
end

#ignore_interruptsvoid

This method returns an undefined value.

Tells the zpoller to ignore interrupts. By default, #wait will return immediately if it detects an interrupt (when zsys_interrupted is set to something other than zero). Calling this method will supress this behavior.



67
68
69
# File 'lib/cztop/poller/zpoller.rb', line 67

def ignore_interrupts
  ffi_delegate.ignore_interrupts
end

#nonstop=(flag) ⇒ Object

By default the poller stops if the process receives a SIGINT or SIGTERM signal. This makes it impossible to shut-down message based architectures like zactors. This method lets you switch off break handling. The default nonstop setting is off (false).

Setting this will cause #wait to never raise.

Parameters:

  • flag (Boolean)

    whether the poller should run nonstop



79
80
81
# File 'lib/cztop/poller/zpoller.rb', line 79

def nonstop=(flag)
  ffi_delegate.set_nonstop(flag)
end

#remember_socket(socket) ⇒ void (private)

This method returns an undefined value.

Remembers the socket so a call to #wait can return with the exact same instance of Socket, and it also makes sure the socket won’t get GC’d.

Parameters:

  • socket (Socket, Actor)

    the socket instance to remember



90
91
92
# File 'lib/cztop/poller/zpoller.rb', line 90

def remember_socket(socket)
  @sockets[socket.to_ptr.to_i] = socket
end

#remove(reader) ⇒ void

This method returns an undefined value.

Removes a reader socket from the poller.

Parameters:

Raises:

  • (ArgumentError)

    if socket was invalid, e.g. it wasn’t registered in this poller

  • (SystemCallError)

    if this fails for another reason



41
42
43
44
45
# File 'lib/cztop/poller/zpoller.rb', line 41

def remove(reader)
  rc = ffi_delegate.remove(reader)
  raise_zmq_err("unable to remove socket %p" % reader) if rc == -1
  forget_socket(reader)
end

#socket_by_ptr(ptr) ⇒ Socket, Actor (private)

Gets the previously remembered socket associated to the given pointer.

Parameters:

  • ptr (FFI::Pointer)

    the pointer to a socket

Returns:

  • (Socket, Actor)

    the socket associated to the given pointer

Raises:

  • (SystemCallError)

    if no socket is registered under given pointer



105
106
107
108
109
110
111
# File 'lib/cztop/poller/zpoller.rb', line 105

def socket_by_ptr(ptr)
  @sockets[ptr.to_i] or
    # NOTE: This should never happen, since #wait will return nil if
    # +zpoller_wait+ returned NULL. But it's better to fail early in case
    # it ever returns a wrong pointer.
    raise_zmq_err("no socket known for pointer #{ptr.inspect}")
end

#wait(timeout = -1)) ⇒ Socket, ...

Waits and returns the first socket that becomes readable.

Parameters:

  • timeout (Integer) (defaults to: -1))

    how long to wait in ms, or 0 to avoid blocking, or -1 to wait indefinitely

Returns:

  • (Socket, Actor)

    first socket of interest

  • (nil)

    if the timeout expired or

Raises:

  • (Interrupt)

    if the timeout expired or



53
54
55
56
57
58
59
60
# File 'lib/cztop/poller/zpoller.rb', line 53

def wait(timeout = -1)
  ptr = ffi_delegate.wait(timeout)
  if ptr.null?
    raise Interrupt if ffi_delegate.terminated
    return nil
  end
  return socket_by_ptr(ptr)
end