Class: CZTop::Poller::Aggregated

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/cztop/poller/aggregated.rb

Overview

This is a poller which is able to provide a list of readable and a list of writable sockets. This is useful for when you need to process socket events in batch, rather than one per event loop iteration.

In particular, this is needed in Celluloid::ZMQ, where in a call to Celluloid::ZMQ::Reactor#run_once all readable/writable sockets need to be processed.

Implementation

It wraps a CZTop::Poller and just does the following to support getting an array of readable/writable sockets:

  • in #wait, poll with given timeout

  • in case there was an event:

    • deregister the corresponding event(s) on the registered socket

    • poll again with zero timeout until no more sockets

    • repeat and accumulate results into two lists

Forwarded Methods

The following methods are defined on this class too, and calls are forwarded directly to the actual CZTop::Poller instance:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(poller = CZTop::Poller.new) ⇒ Aggregated

Initializes the aggregated poller.

Parameters:

  • poller (CZTop::Poller) (defaults to: CZTop::Poller.new)

    the wrapped poller



66
67
68
69
70
# File 'lib/cztop/poller/aggregated.rb', line 66

def initialize(poller = CZTop::Poller.new)
  @readables = []
  @writables = []
  @poller    = poller
end

Instance Attribute Details

#pollerCZTop::Poller.new (readonly)

Returns the associated (regular) poller.

Returns:



40
41
42
# File 'lib/cztop/poller/aggregated.rb', line 40

def poller
  @poller
end

#readablesArray<CZTop::Socket> (readonly)

Returns readable sockets.

Returns:



44
45
46
# File 'lib/cztop/poller/aggregated.rb', line 44

def readables
  @readables
end

#writablesArray<CZTop::Socket> (readonly)

Returns writable sockets.

Returns:



48
49
50
# File 'lib/cztop/poller/aggregated.rb', line 48

def writables
  @writables
end

Instance Method Details

#extract(event) ⇒ void (private)

This method returns an undefined value.

Extracts the event information, adds the socket to the correct list(s) and modifies the socket’s event mask for the socket to not turn up again during the next call(s) to CZTop::Poller#wait within #wait.

Parameters:



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/cztop/poller/aggregated.rb', line 116

def extract(event)
  event_mask                 = poller.event_mask_for_socket(event.socket)
  @event_masks[event.socket] = event_mask

  if event.readable?
    @readables << event.socket
    event_mask &= 0xFFFF ^ CZTop::Poller::ZMQ::POLLIN
  end

  if event.writable?
    @writables << event.socket
    event_mask &= 0xFFFF ^ CZTop::Poller::ZMQ::POLLOUT
  end

  poller.modify(event.socket, event_mask)
end

#restore_event_masksvoid (private)

This method returns an undefined value.

Restores the event mask for all registered sockets to the state they were before the call to #wait.



137
138
139
# File 'lib/cztop/poller/aggregated.rb', line 137

def restore_event_masks
  @event_masks.each { |socket, mask| poller.modify(socket, mask) }
end

#wait(timeout = -1)) ⇒ Boolean

Forgets all previous event information (which sockets are readable/writable) and waits for events anew. After getting the first event, CZTop::Poller#wait is called again with a zero-timeout to get all pending events to extract them into the aggregated lists of readable and writable sockets.

For every event, the corresponding event mask flag is disabled for the associated socket, so it won’t turn up again. Finally, all event masks are restored to what they were before the call to this method.

Parameters:

  • timeout (Integer) (defaults to: -1))

    how long to wait in ms, or 0 to avoid blocking, or -1 to wait indefinitely

Returns:

  • (Boolean)

    whether there have been any events



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/cztop/poller/aggregated.rb', line 86

def wait(timeout = -1)
  @readables   = []
  @writables   = []
  @event_masks = {}

  if (event = @poller.wait(timeout))
    extract(event)

    # get all other pending events, if any, but no more blocking
    while (event = @poller.wait 0)
      extract(event)
    end

    restore_event_masks
    return true
  end

  false
end