Class: CZTop::Poller

Inherits:
Object
  • Object
show all
Includes:
CZMQ::FFI
Defined in:
lib/cztop/poller.rb

Overview

A non-trivial socket poller.

It can poll for readability and writability, and supports thread-safe sockets (SERVER/CLIENT/RADIO/DISH).

This implementation is NOT based on CZMQ's zpoller. Reasons:

  • zpoller can only poll for readability

Defined Under Namespace

Modules: ZMQ Classes: Aggregated, Event, ZPoller

Instance Method Summary collapse

Constructor Details

#initialize(*readers) ⇒ Poller

Returns a new instance of Poller.

Parameters:

  • readers (Socket, Actor)

    sockets to poll for input


15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/cztop/poller.rb', line 15

def initialize(*readers)
  @sockets = {} # needed to return the same socket objects
  @events = {} # event masks for each socket
  @poller_ptr = ZMQ.poller_new
  ObjectSpace.define_finalizer(@poller_ptr,
    Proc.new do
      ptr_ptr = ::FFI::MemoryPointer.new :pointer
      ptr_ptr.write_pointer(@poller_ptr)
      ZMQ.poller_destroy(ptr_ptr)
    end)
  @event_ptr = FFI::MemoryPointer.new(ZMQ::PollerEvent)
  readers.each { |r| add_reader(r) }
end

Instance Method Details

#add(socket, events) ⇒ void

This method returns an undefined value.

Adds a socket to be polled for readability.

Parameters:

  • socket (Socket, Actor)

    the socket

  • events (Integer)

    bitwise-OR'd events you're interested in (see POLLIN and POLLOUT constants in ZMQ

Raises:

  • (ArgumentError)

    if it's not a socket


35
36
37
38
39
40
# File 'lib/cztop/poller.rb', line 35

def add(socket, events)
  ptr = ptr_for_socket(socket)
  rc = ZMQ.poller_add(@poller_ptr, ptr, nil, events)
  HasFFIDelegate.raise_zmq_err if rc == -1
  remember_socket(socket, events)
end

#add_reader(socket) ⇒ void

This method returns an undefined value.

Convenience method to register a socket for readability. See #add.

Parameters:


45
46
47
# File 'lib/cztop/poller.rb', line 45

def add_reader(socket)
  add(socket, ZMQ::POLLIN)
end

#add_writer(socket) ⇒ void

This method returns an undefined value.

Convenience method to register a socket for writability. See #add.

Parameters:


52
53
54
# File 'lib/cztop/poller.rb', line 52

def add_writer(socket)
  add(socket, ZMQ::POLLOUT)
end

#event_mask_for_socket(socket) ⇒ Integer

Returns the event mask for the given, registered socket.

Parameters:

  • socket (Socket, Actor)

    which socket's events to return

Returns:

  • (Integer)

    event mask for the given socket

Raises:

  • (ArgumentError)

    if socket is not registered


161
162
163
164
# File 'lib/cztop/poller.rb', line 161

def event_mask_for_socket(socket)
  @events[socket] or
    raise ArgumentError, "no event mask known for socket %p" % socket
end

#forget_socket(socket) ⇒ Object (private)

Discards the referencel to the given socket, and forgets its event mask.

Parameters:


186
187
188
189
# File 'lib/cztop/poller.rb', line 186

def forget_socket(socket)
  @sockets.delete(ptr_for_socket(socket).to_i)
  @events.delete(socket)
end

#modify(socket, events) ⇒ void

This method returns an undefined value.

Modifies the events of interest for the given socket.

Parameters:

  • socket (Socket, Actor)

    the socket

  • events (Integer)

    events you're interested in (see constants in ZMQ

Raises:

  • (ArgumentError)

    if it's not a socket


62
63
64
65
66
67
# File 'lib/cztop/poller.rb', line 62

def modify(socket, events)
  ptr = ptr_for_socket(socket)
  rc = ZMQ.poller_modify(@poller_ptr, ptr, events)
  HasFFIDelegate.raise_zmq_err if rc == -1
  remember_socket(socket, events)
end

#ptr_for_socket(socket) ⇒ FFI::Pointer (private)

Returns low-level handle.

Parameters:

Returns:

  • (FFI::Pointer)

    low-level handle

Raises:

  • (ArgumentError)

    if argument is not a socket


171
172
173
174
# File 'lib/cztop/poller.rb', line 171

def ptr_for_socket(socket)
  raise ArgumentError unless socket.is_a?(Socket) || socket.is_a?(Actor)
  Zsock.resolve(socket)
end

#remember_socket(socket, events) ⇒ Object (private)

Keeps a reference to the given socket, and remembers its event mask.

Parameters:

  • socket (Socket, Actor)

    the socket

  • events (Integer)

    the event mask


179
180
181
182
# File 'lib/cztop/poller.rb', line 179

def remember_socket(socket, events)
  @sockets[ptr_for_socket(socket).to_i] = socket
  @events[socket] = events
end

#remove(socket) ⇒ void

This method returns an undefined value.

Removes a previously registered socket. Won't raise if you're trying to remove a socket that's not registered.

Parameters:

Raises:

  • (ArgumentError)

    if it's not a socket


74
75
76
77
78
79
# File 'lib/cztop/poller.rb', line 74

def remove(socket)
  ptr = ptr_for_socket(socket)
  rc = ZMQ.poller_remove(@poller_ptr, ptr)
  HasFFIDelegate.raise_zmq_err if rc == -1
  forget_socket(socket)
end

#remove_reader(socket) ⇒ Object

Removes a reader socket that was registered for readability only.

Parameters:

Raises:

  • (ArgumentError)

    if it's not registered, not registered for readability, or registered for more than just readability


86
87
88
89
90
91
92
# File 'lib/cztop/poller.rb', line 86

def remove_reader(socket)
  if event_mask_for_socket(socket) == ZMQ::POLLIN
    remove(socket)
    return
  end
  raise ArgumentError, "not registered for readability only: %p" % socket
end

#remove_writer(socket) ⇒ Object

Removes a reader socket that was registered for writability only.

Parameters:

Raises:

  • (ArgumentError)

    if it's not registered, not registered for writability, or registered for more than just writability


99
100
101
102
103
104
105
# File 'lib/cztop/poller.rb', line 99

def remove_writer(socket)
  if event_mask_for_socket(socket) == ZMQ::POLLOUT
    remove(socket)
    return
  end
  raise ArgumentError, "not registered for writability only: %p" % socket
end

#simple_wait(timeout = -1)) ⇒ Socket, ...

Simpler version of #wait, which just returns the first socket of interest, if any. This is useful if you either have only reader sockets, or only have writer sockets.

Parameters:

  • timeout (Integer) (defaults to: -1))

    how long to wait in ms, or 0 to avoid blocking, or -1 to wait indefinitely

Returns:

  • (Socket, Actor)

    first socket of interest

  • (nil)

    if timeout expired

Raises:

  • (SystemCallError)

    if this failed


138
139
140
141
# File 'lib/cztop/poller.rb', line 138

def simple_wait(timeout = -1)
  event = wait(timeout)
  return event.socket if event
end

#socket_for_ptr(ptr) ⇒ Socket, Actor

Returns socket corresponding to given pointer.

Parameters:

  • ptr (FFI::Pointer)

    pointer to the socket

Returns:

  • (Socket, Actor)

    socket corresponding to given pointer

Raises:

  • (ArgumentError)

    if pointer is not known


146
147
148
149
# File 'lib/cztop/poller.rb', line 146

def socket_for_ptr(ptr)
  @sockets[ptr.to_i] or
    raise ArgumentError, "no socket known for pointer %p" % ptr
end

#socketsArray<CZTop::Socket>

Note:

The actual events registered for each sockets don't matter.

Returns all sockets registered with this poller.

Returns:

  • (Array<CZTop::Socket>)

    all sockets registered with this poller


153
154
155
# File 'lib/cztop/poller.rb', line 153

def sockets
  @sockets.values
end

#wait(timeout = -1)) ⇒ Event?

Waits for registered sockets to become readable or writable, depending on what you're interested in.

Parameters:

  • timeout (Integer) (defaults to: -1))

    how long to wait in ms, or 0 to avoid blocking, or -1 to wait indefinitely

Returns:

  • (Event)

    the first event of interest

  • (nil)

    if the timeout expired or

Raises:

  • (SystemCallError)

    if this failed


115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/cztop/poller.rb', line 115

def wait(timeout = -1)
  rc = ZMQ.poller_wait(@poller_ptr, @event_ptr, timeout)
  if rc == -1
    case CZMQ::FFI::Errors.errno
    # NOTE: ETIMEDOUT for backwards compatibility, although this API is
      # still DRAFT.
    when Errno::EAGAIN::Errno, Errno::ETIMEDOUT::Errno
      return nil
    else
      HasFFIDelegate.raise_zmq_err
    end
  end
  return Event.new(self, @event_ptr)
end