Class: RbZMQ::Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/rbzmq/poller.rb

Overview

RbZMQ::Poller

The Poller allows to poll on one or more ZMQ sockets or file descriptors simultaneously.

Examples:

poller = RbZMQ::Poller.new
poller.register(socket, ZMQ::POLLIN)
poller.poll(10_000) do |socket|
  # Do something with socket
end

Instance Method Summary collapse

Constructor Details

#initializePoller

Create a new poller.


19
20
21
22
# File 'lib/rbzmq/poller.rb', line 19

def initialize
  @poll_items = ZMQ::PollItems.new
  @mutex      = Mutex.new
end

Instance Method Details

#delete(pollable) ⇒ Boolean

Remove socket or IO object from poller.

Parameters:

Returns:

  • (Boolean)

    True if pollable was successfully removed, false otherwise.


162
163
164
165
166
167
168
# File 'lib/rbzmq/poller.rb', line 162

def delete(pollable)
  mutex.synchronize do
    return false if @poll_items.empty?

    @poll_items.delete pollable
  end
end

#deregister(pollable, events = ZMQ::POLLIN | ZMQ::POLLOUT) ⇒ Boolean

Deregister events from pollable.

When no events are left or socket or IO object has been closed it will also be remove from watched objects.

Parameters:

  • pollable (RbZMQ::Socket, IO)

    Watchable socket or IO object.

  • events (Integer) (defaults to: ZMQ::POLLIN | ZMQ::POLLOUT)

    ZMQ events. Allowed values are ZMQ::POLLIN and ZMQ::POLLOUT.

Returns:

  • (Boolean)

    False if pollable was removed because all events where removed or it was closed, nil if pollable was not registered or an Integer with the leaving events.


135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/rbzmq/poller.rb', line 135

def deregister(pollable, events = ZMQ::POLLIN | ZMQ::POLLOUT)
  return unless pollable

  mutex.synchronize do
    item = @poll_items[pollable]
    if item && (item.events & events) > 0
      item.events ^= events

      if item.events.zero? || item.closed?
        @poll_items.delete pollable
        false
      else
        item.events
      end
    else
      nil
    end
  end
end

#poll(timeout) {|pollable| ... } ⇒ Enumerator, ...

Poll on all registered objects.

If a block is given it will be invoked for each ready pollable object. Without a block an enumerator of ready pollables will be returned.

If not selectable is registered #poll will return without blocking.

Examples:

Poll with block

poller.poll(10_000) do |io|
  io.readable? || io.writable? #=> true
end

Parameters:

  • timeout (Integer, Symbol)

    A timeout in milliseconds. The values `-1`, `:blocking` and `:infinity` will block indefinitely.

Yields:

  • (pollable)

    Yield each ready object.

Yield Parameters:

  • pollable (RbZMQ::Socket, IO, Object)

    Registered pollable object.

Returns:

  • (Enumerator, Boolean, Nil)

    The return value is determined by the following rules:

    1. Nil is returned when no objects are registered.

    2. An Enumerator will be returned when no block is given. The enumerator will have no elements if call timed out.

    3. If a block is given true will be returned when objects were ready, false if times out.


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rbzmq/poller.rb', line 55

def poll(timeout, &block)
  mutex.synchronize do
    if @poll_items.any?
      ready_items = do_poll(convert_timeout(timeout))

      if block_given?
        ready_items > 0 ? each_ready_item(&block) : false
      else
        if ready_items > 0
          to_enum(:each_ready_item)
        else
          Array.new.to_enum(:each)
        end
      end
    else
      nil
    end
  end
end

#register(pollable, events = ZMQ::POLLIN) ⇒ Integer

Register given socket or IO to be watched on given event list.

This method is idempotent.

Examples:

Watch socket to read

socket = RbZMQ::Socket.new(ZMQ::DEALER)
poller.register(socket, ZMQ::POLLIN)

Watch IO to write

reader, writer = IO.pipe
poller.register(writer, ZMQ::POLLOUT)

Parameters:

  • pollable (RbZMQ::Socket, IO)

    Watchable socket or IO object.

  • events (Integer) (defaults to: ZMQ::POLLIN)

    ZMQ events. Calling multiple times with different events will OR the events together. Allowed values are ZMQ::POLLIN and ZMQ::POLLOUT.

Returns:

  • (Integer)

    Registered events for pollable.


105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/rbzmq/poller.rb', line 105

def register(pollable, events = ZMQ::POLLIN)
  return if pollable.nil? || events.zero?

  mutex.synchronize do
    item = @poll_items[pollable]
    unless item
      item = ::ZMQ::PollItem.from_pollable(pollable)
      @poll_items << item
    end

    item.events |= events
  end
end

#sizeInteger

Return number of registered pollables.

Returns:

  • (Integer)

    Number of registered objects.


79
80
81
# File 'lib/rbzmq/poller.rb', line 79

def size
  mutex.synchronize { @poll_items.size }
end