Class: ZMQ::Poll

Inherits:
Object
  • Object
show all
Defined in:
lib/0mq/poll.rb

Overview

A mechanism for applications to multiplex input/output events in a level-triggered fashion over a set of sockets.

Direct Known Subclasses

PollInterruptible

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*sockets) ⇒ Poll

Accepts a list of sockets to poll for events (ZMQ::POLLIN, ZMQ::POLLOUT, ZMQ::POLLERR). Default is to poll for input (ZMQ::POLLIN). To poll for a different kind of event, specify the socket and event type as a key/value pair (my_socket => ZMQ::POLLOUT). Event flags can be binary OR’d together if necessary (my_socket => ZMQ::POLLIN | ZMQ::POLLOUT).

Timeout can be specified in seconds as a keyword arg. See the :timeout accessor.

Does not poll until #run is called.

Example: ZMQ::Poll.new socket1, socket2, socket3 => ZMQ::POLLOUT, timeout: 1



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/0mq/poll.rb', line 43

def initialize(*sockets)
  opts = sockets.last.is_a?(Hash) ? sockets.pop : {} # For Ruby 1.9
  
  @timeout = opts.fetch :timeout, -1
  
  @poll_items = []
  @socks = {}

  sockets.each { |socket| @socks[socket] = ZMQ::POLLIN }
  
  # Pull remaining sockets out of options hash and package into poll items.
  # Skip any option symbols in the hash; they aren't sockets.
  # Rejecting symbols allows duck-typed sockets to be included.
  @socks.merge! opts.reject {|socket, events| socket.is_a? Symbol}
  
  # Build table to reference ZMQ::Socket to its pointer's address.
  # This is an easy way to reconnect PollItem to ZMQ::Socket without
  # having to store multiple dimensions in the socks hash.
  @socket_lookup = {}
  @socks.each { |socket, event| @socket_lookup[socket.to_ptr.address] = socket }
  
  # Allocate space for C PollItem (zmq_pollitem_t) structs.
  @poll_structs = FFI::MemoryPointer.new LibZMQ::PollItem, @socks.count, true
  
  # Create the PollItem objects.
  # Initializing them within the FFI::MemoryPointer prevents having to copy
  # the struct data to the MemoryPointer when polling, then back again to
  # retrieve the revents flags.
  i = 0
  @socks.each do |socket, events|
    @poll_items.push LibZMQ::PollItem.new(@poll_structs[i]).tap { |pi|
      pi.socket = socket
      pi.events = events
    }
    
    i += 1
  end
end

Instance Attribute Details

#timeoutObject

Timeout is specified in seconds. A value of 0 will return immediately (non-blocking), and a value of -1 will block indefinitely until an event has occurred. Fractions of a second are allowed. Timeout defaults to block indefinitely (-1).



12
13
14
# File 'lib/0mq/poll.rb', line 12

def timeout
  @timeout
end

Class Method Details

.poll(*sockets, &block) ⇒ Object

Construct a Poll object and start polling. See #initialize for parameters. See #run for block and return value.



17
18
19
20
# File 'lib/0mq/poll.rb', line 17

def self.poll(*sockets, &block)
  poller = new *sockets
  poller.run &block
end

.poll_nonblock(*sockets, &block) ⇒ Object

Non-blocking version of poll.



23
24
25
# File 'lib/0mq/poll.rb', line 23

def self.poll_nonblock(*sockets, &block)
  self.poll *sockets, timeout: 0, &block
end

Instance Method Details

#run(&block) ⇒ Object

Start polling.

Returns a hash of ZMQ::Socket => revents (triggered event flags). Each item of the hash is passed to the block, if provided.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/0mq/poll.rb', line 86

def run(&block)
  return {} if @poll_items.empty?
  
  # Convert seconds to miliseconds.
  timeout = @timeout > 0 ? (@timeout * 1000).to_i : @timeout
  
  # Poll
  rc = LibZMQ::zmq_poll @poll_structs, @poll_items.count, timeout
  ZMQ.error_check true if rc == -1
  
  # Create a hash of the items with triggered events.
  # (ZMQ::Socket => revents)
  triggered_items = @poll_items.select { |pi| pi.revents > 0 }
    .map { |pi| [@socket_lookup[pi.socket.address], pi.revents] }
  
  triggered_items = Hash[triggered_items]
  
  # Pass triggered sockets to block.
  triggered_items.each { |socket, revents| block.call socket, revents } if block
  
  triggered_items
end

#run_nonblock(&block) ⇒ Object

Non-blocking version of run.



110
111
112
113
# File 'lib/0mq/poll.rb', line 110

def run_nonblock(&block)
  @timeout = 0
  run &block
end