Class: ZMQ::Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/poller.rb,
ext/rbczmq/poller.c

Instance Method Summary collapse

Instance Method Details

#poll(1) ⇒ Fixnum

Multiplexes input/output events in a level-triggered fashion over a set of registered sockets.

Examples

Supported timeout values :

-1 : block until any sockets are ready (no timeout)

0  : non-blocking poll
1  : block for up to 1 second (1000ms)

0.1 : block for up to 0.1 seconds (100ms)

poller = ZMQ::Poller.new             =>  ZMQ::Poller
poller.register(req)                 =>  true
poller.poll(1)                       =>  Fixnum

Returns:

  • (Fixnum)


135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'ext/rbczmq/poller.c', line 135

VALUE rb_czmq_poller_poll(int argc, VALUE *argv, VALUE obj)
{
    VALUE tmout;
    size_t timeout;
    struct nogvl_poll_args args;
    int rc;
    ZmqGetPoller(obj);
    rb_scan_args(argc, argv, "01", &tmout);
    if (NIL_P(tmout)) tmout = INT2NUM(0);
    if (TYPE(tmout) != T_FIXNUM && TYPE(tmout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(tmout)));
    if (poller->poll_size == 0) return INT2NUM(0);
    if (poller->dirty == true) {
        rc = rb_czmq_poller_rebuild_pollset(poller);
        if (rc == -1) rb_raise(rb_eZmqError, "failed in rebuilding the pollset!");
    }
    timeout = (size_t)(((TYPE(tmout) == T_FIXNUM) ? FIX2LONG(tmout) : RFLOAT_VALUE(tmout)) * 1000); 
    if (timeout < 0) timeout = -1;

    args.items = poller->pollset;
    args.nitems = poller->poll_size;
    args.timeout = (long)timeout;

    rb_ary_clear(poller->readables);
    rb_ary_clear(poller->writables);

    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);

    /* only call ZmqAssert if return code is less than zero since zmq_poll returns the number of pollers on success */
    if (rc < 0) {
        if (zmq_errno() == EINTR || zmq_errno() == EAGAIN) {
            // these are recoverable errors, so return flow to ruby so that retry / interrupt handling can be
            // done in ruby code. Ruby will see a -1 result. If it was an INT (or other) signal, ruby's signal
            // handler will raise the Interrupt exception.
        } else {
            ZmqAssert(rc);
        }
    }
    if (rc > 0) {
        rb_czmq_poller_rebuild_selectables(poller);
    }
    return INT2NUM(rc);
}

#poll_nonblockObject

API sugar to poll non-blocking. Returns immediately if there’s no items in a ready state.



7
8
9
# File 'lib/zmq/poller.rb', line 7

def poll_nonblock
  poll(0)
end

#readablesArray

All poll items in a readable state after the last poll.

Examples

poller = ZMQ::Poller.new                          =>  ZMQ::Poller
poller.register(ZMQ::Pollitem(req, ZMQ::POLLIN))  =>  true
poller.poll(1)                                    =>  1
poller.readables                                  =>  [req]

Returns:

  • (Array)


260
261
262
263
264
# File 'ext/rbczmq/poller.c', line 260

VALUE rb_czmq_poller_readables(VALUE obj)
{
    ZmqGetPoller(obj);
    return poller->readables;
}

#register(pollitem) ⇒ Boolean

Registers a poll item for a particular I/O event (ZMQ::POLLIN or ZMQ::POLLOUT) with this poller instance. ZMQ::Socket or Ruby IO instances will automatically be coerced to ZMQ::Pollitem instances with the default events mask (ZMQ::POLLIN | ZMQ::POLLOUT)

Examples

Supported events :

ZMQ::POLLIN : readable state ZMQ::POLLOUT : writable state

poller = ZMQ::Poller.new                              =>  ZMQ::Poller
poller.register(ZMQ::Pollitem.new(req, ZMQ::POLLIN))  =>  true

poller.register(pub_socket)                           =>  true
poller.register(STDIN)                                =>  true

Returns:

  • (Boolean)


200
201
202
203
204
205
206
207
208
209
210
211
# File 'ext/rbczmq/poller.c', line 200

VALUE rb_czmq_poller_register(VALUE obj, VALUE pollable)
{
    ZmqGetPoller(obj);
    pollable = rb_czmq_pollitem_coerce(pollable);
    ZmqGetPollitem(pollable);
    /* Let pollable item be verbose if poller is verbose */
    if (poller->verbose == true) rb_czmq_pollitem_set_verbose(pollable, Qtrue);
    rb_ary_push(poller->pollables, pollable);
    poller->poll_size++;
    poller->dirty = true;
    return pollable;
}

#register_readable(pollable) ⇒ Object

API sugar for registering a ZMQ::Socket or IO for readability



13
14
15
# File 'lib/zmq/poller.rb', line 13

def register_readable(pollable)
  register ZMQ::Pollitem.new(pollable, ZMQ::POLLIN)
end

#register_writable(pollable) ⇒ Object

API sugar for registering a ZMQ::Socket or IO for writability



19
20
21
# File 'lib/zmq/poller.rb', line 19

def register_writable(pollable)
  register ZMQ::Pollitem.new(pollable, ZMQ::POLLOUT)
end

#remove(pollitem) ⇒ Boolean

Removes a poll item from this poller. Deregisters the socket for any previously registered events. Note that we match on both poll items as well as pollable entities for all registered poll items.

Examples

poller = ZMQ::Poller.new             =>  ZMQ::Poller
poller.register(req)                 =>  true
poller.remove(req)                   =>  true

Returns:

  • (Boolean)


227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'ext/rbczmq/poller.c', line 227

VALUE rb_czmq_poller_remove(VALUE obj, VALUE pollable)
{
    int pos;
    VALUE rpollable;
    ZmqGetPoller(obj);
    pollable = rb_czmq_pollitem_coerce(pollable);
    ZmqGetPollitem(pollable);
    for (pos = 0; pos < poller->poll_size; pos++) {
        rpollable = rb_ary_entry(poller->pollables, (long)pos);
        if (pollable == rpollable || rb_czmq_pollitem_pollable(pollable) == rb_czmq_pollitem_pollable(rpollable)) {
            rb_ary_delete(poller->pollables, rpollable);
            poller->poll_size--;
            poller->dirty = true;
            return rpollable;
        }
    }
    return Qfalse;
}

#verbose=(true) ⇒ nil

Logs poller activity to stdout - useful for debugging, but can be quite noisy with lots of activity.

Examples

poller = ZMQ::Poller.new    =>   ZMQ::Poller
poller.verbose = true   =>    nil

Returns:

  • (nil)


298
299
300
301
302
303
304
305
# File 'ext/rbczmq/poller.c', line 298

static VALUE rb_czmq_poller_set_verbose(VALUE obj, VALUE level)
{
    bool vlevel;
    ZmqGetPoller(obj);
    vlevel = (level == Qtrue) ? true : false;
    poller->verbose = vlevel;
    return Qnil;
}

#writablesArray

All poll items in a writable state after the last poll.

Examples

poller = ZMQ::Poller.new                           =>  ZMQ::Poller
poller.register(ZMQ::Pollitem(req, ZMQ::POLLOUT))  =>  true
poller.poll(1)                                     =>  1
poller.writables                                   =>  [req]

Returns:

  • (Array)


280
281
282
283
284
# File 'ext/rbczmq/poller.c', line 280

VALUE rb_czmq_poller_writables(VALUE obj)
{
    ZmqGetPoller(obj);
    return poller->writables;
}