Class: ZMQ::Poller
- Inherits:
-
Object
- Object
- ZMQ::Poller
- Defined in:
- lib/zmq/poller.rb,
ext/rbczmq/poller.c
Instance Method Summary collapse
-
#poll(1) ⇒ Fixnum
Multiplexes input/output events in a level-triggered fashion over a set of registered sockets.
-
#poll_nonblock ⇒ Object
API sugar to poll non-blocking.
-
#readables ⇒ Array
All poll items in a readable state after the last poll.
-
#register(pollitem) ⇒ Boolean
Registers a poll item for a particular I/O event (ZMQ::POLLIN or ZMQ::POLLOUT) with this poller instance.
-
#register_readable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for readability.
-
#register_writable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for writability.
-
#remove(pollitem) ⇒ Boolean
Removes a poll item from this poller.
-
#verbose=(true) ⇒ nil
Logs poller activity to stdout - useful for debugging, but can be quite noisy with lots of activity.
-
#writables ⇒ Array
All poll items in a writable state after the last poll.
Instance Method Details
#poll(1) ⇒ Fixnum
Multiplexes input/output events in a level-triggered fashion over a set of registered sockets.
Examples
Supported timeout values :
-1 : block until any sockets are ready (no timeout)
0 : non-blocking poll
1 : block for up to 1 second (1000ms)
0.1 : block for up to 0.1 seconds (100ms)
poller = ZMQ::Poller.new => ZMQ::Poller
poller.register(req) => true
poller.poll(1) => Fixnum
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'ext/rbczmq/poller.c', line 135
VALUE rb_czmq_poller_poll(int argc, VALUE *argv, VALUE obj)
{
VALUE tmout;
size_t timeout;
struct nogvl_poll_args args;
int rc;
ZmqGetPoller(obj);
rb_scan_args(argc, argv, "01", &tmout);
if (NIL_P(tmout)) tmout = INT2NUM(0);
if (TYPE(tmout) != T_FIXNUM && TYPE(tmout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(tmout)));
if (poller->poll_size == 0) return INT2NUM(0);
if (poller->dirty == true) {
rc = rb_czmq_poller_rebuild_pollset(poller);
if (rc == -1) rb_raise(rb_eZmqError, "failed in rebuilding the pollset!");
}
timeout = (size_t)(((TYPE(tmout) == T_FIXNUM) ? FIX2LONG(tmout) : RFLOAT_VALUE(tmout)) * 1000);
if (timeout < 0) timeout = -1;
args.items = poller->pollset;
args.nitems = poller->poll_size;
args.timeout = (long)timeout;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
/* only call ZmqAssert if return code is less than zero since zmq_poll returns the number of pollers on success */
if (rc < 0) {
ZmqAssert(rc);
}
if (rc == 0) {
rb_ary_clear(poller->readables);
rb_ary_clear(poller->writables);
} else {
rb_czmq_poller_rebuild_selectables(poller);
}
return INT2NUM(rc);
}
|
#poll_nonblock ⇒ Object
API sugar to poll non-blocking. Returns immediately if there’s no items in a ready state.
7 8 9 |
# File 'lib/zmq/poller.rb', line 7 def poll_nonblock poll(0) end |
#readables ⇒ Array
253 254 255 256 257 |
# File 'ext/rbczmq/poller.c', line 253
VALUE rb_czmq_poller_readables(VALUE obj)
{
ZmqGetPoller(obj);
return poller->readables;
}
|
#register(pollitem) ⇒ Boolean
Registers a poll item for a particular I/O event (ZMQ::POLLIN or ZMQ::POLLOUT) with this poller instance. ZMQ::Socket or Ruby IO instances will automatically be coerced to ZMQ::Pollitem instances with the default events mask (ZMQ::POLLIN | ZMQ::POLLOUT)
Examples
Supported events :
ZMQ::POLLIN : readable state ZMQ::POLLOUT : writable state
poller = ZMQ::Poller.new => ZMQ::Poller
poller.register(ZMQ::Pollitem.new(req, ZMQ::POLLIN)) => true
poller.register(pub_socket) => true
poller.register(STDIN) => true
193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'ext/rbczmq/poller.c', line 193
VALUE rb_czmq_poller_register(VALUE obj, VALUE pollable)
{
ZmqGetPoller(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
/* Let pollable item be verbose if poller is verbose */
if (poller->verbose == true) rb_czmq_pollitem_set_verbose(pollable, Qtrue);
rb_ary_push(poller->pollables, pollable);
poller->poll_size++;
poller->dirty = true;
return pollable;
}
|
#register_readable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for readability
13 14 15 |
# File 'lib/zmq/poller.rb', line 13 def register_readable(pollable) register ZMQ::Pollitem.new(pollable, ZMQ::POLLIN) end |
#register_writable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for writability
19 20 21 |
# File 'lib/zmq/poller.rb', line 19 def register_writable(pollable) register ZMQ::Pollitem.new(pollable, ZMQ::POLLOUT) end |
#remove(pollitem) ⇒ Boolean
Removes a poll item from this poller. Deregisters the socket for any previously registered events. Note that we match on both poll items as well as pollable entities for all registered poll items.
Examples
poller = ZMQ::Poller.new => ZMQ::Poller
poller.register(req) => true
poller.remove(req) => true
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'ext/rbczmq/poller.c', line 220
VALUE rb_czmq_poller_remove(VALUE obj, VALUE pollable)
{
int pos;
VALUE rpollable;
ZmqGetPoller(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
for (pos = 0; pos < poller->poll_size; pos++) {
rpollable = rb_ary_entry(poller->pollables, (long)pos);
if (pollable == rpollable || rb_czmq_pollitem_pollable(pollable) == rb_czmq_pollitem_pollable(rpollable)) {
rb_ary_delete(poller->pollables, rpollable);
poller->poll_size--;
poller->dirty = true;
return rpollable;
}
}
return Qfalse;
}
|
#verbose=(true) ⇒ nil
291 292 293 294 295 296 297 298 |
# File 'ext/rbczmq/poller.c', line 291
static VALUE rb_czmq_poller_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
ZmqGetPoller(obj);
vlevel = (level == Qtrue) ? true : false;
poller->verbose = vlevel;
return Qnil;
}
|
#writables ⇒ Array
273 274 275 276 277 |
# File 'ext/rbczmq/poller.c', line 273
VALUE rb_czmq_poller_writables(VALUE obj)
{
ZmqGetPoller(obj);
return poller->writables;
}
|