Class: ZMQ::Poller
- Inherits:
-
Object
- Object
- ZMQ::Poller
- Defined in:
- lib/zmq/poller.rb,
ext/rbczmq/poller.c
Instance Method Summary collapse
-
#poll(1) ⇒ Fixnum
Multiplexes input/output events in a level-triggered fashion over a set of registered sockets.
-
#poll_nonblock ⇒ Object
API sugar to poll non-blocking.
-
#readables ⇒ Array
All poll items in a readable state after the last poll.
-
#register(pollitem) ⇒ Boolean
Registers a poll item for a particular I/O event (ZMQ::POLLIN or ZMQ::POLLOUT) with this poller instance.
-
#register_readable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for readability.
-
#register_writable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for writability.
-
#remove(pollitem) ⇒ Boolean
Removes a poll item from this poller.
-
#verbose=(true) ⇒ nil
Logs poller activity to stdout - useful for debugging, but can be quite noisy with lots of activity.
-
#writables ⇒ Array
All poll items in a writable state after the last poll.
Instance Method Details
#poll(1) ⇒ Fixnum
Multiplexes input/output events in a level-triggered fashion over a set of registered sockets.
Examples
Supported timeout values :
-1 : block until any sockets are ready (no timeout)
0 : non-blocking poll
1 : block for up to 1 second (1000ms)
0.1 : block for up to 0.1 seconds (100ms)
poller = ZMQ::Poller.new => ZMQ::Poller
poller.register(req) => true
poller.poll(1) => Fixnum
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'ext/rbczmq/poller.c', line 135
VALUE rb_czmq_poller_poll(int argc, VALUE *argv, VALUE obj)
{
VALUE tmout;
size_t timeout;
struct nogvl_poll_args args;
int rc;
ZmqGetPoller(obj);
rb_scan_args(argc, argv, "01", &tmout);
if (NIL_P(tmout)) tmout = INT2NUM(0);
if (TYPE(tmout) != T_FIXNUM && TYPE(tmout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(tmout)));
if (poller->poll_size == 0) return INT2NUM(0);
if (poller->dirty == true) {
rc = rb_czmq_poller_rebuild_pollset(poller);
if (rc == -1) rb_raise(rb_eZmqError, "failed in rebuilding the pollset!");
}
timeout = (size_t)(((TYPE(tmout) == T_FIXNUM) ? FIX2LONG(tmout) : RFLOAT_VALUE(tmout)) * 1000);
if (timeout < 0) timeout = -1;
args.items = poller->pollset;
args.nitems = poller->poll_size;
args.timeout = (long)timeout;
rb_ary_clear(poller->readables);
rb_ary_clear(poller->writables);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
/* only call ZmqAssert if return code is less than zero since zmq_poll returns the number of pollers on success */
if (rc < 0) {
if (zmq_errno() == EINTR || zmq_errno() == EAGAIN) {
// these are recoverable errors, so return flow to ruby so that retry / interrupt handling can be
// done in ruby code. Ruby will see a -1 result. If it was an INT (or other) signal, ruby's signal
// handler will raise the Interrupt exception.
} else {
ZmqAssert(rc);
}
}
if (rc > 0) {
rb_czmq_poller_rebuild_selectables(poller);
}
return INT2NUM(rc);
}
|
#poll_nonblock ⇒ Object
API sugar to poll non-blocking. Returns immediately if there’s no items in a ready state.
7 8 9 |
# File 'lib/zmq/poller.rb', line 7 def poll_nonblock poll(0) end |
#readables ⇒ Array
260 261 262 263 264 |
# File 'ext/rbczmq/poller.c', line 260
VALUE rb_czmq_poller_readables(VALUE obj)
{
ZmqGetPoller(obj);
return poller->readables;
}
|
#register(pollitem) ⇒ Boolean
Registers a poll item for a particular I/O event (ZMQ::POLLIN or ZMQ::POLLOUT) with this poller instance. ZMQ::Socket or Ruby IO instances will automatically be coerced to ZMQ::Pollitem instances with the default events mask (ZMQ::POLLIN | ZMQ::POLLOUT)
Examples
Supported events :
ZMQ::POLLIN : readable state ZMQ::POLLOUT : writable state
poller = ZMQ::Poller.new => ZMQ::Poller
poller.register(ZMQ::Pollitem.new(req, ZMQ::POLLIN)) => true
poller.register(pub_socket) => true
poller.register(STDIN) => true
200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'ext/rbczmq/poller.c', line 200
VALUE rb_czmq_poller_register(VALUE obj, VALUE pollable)
{
ZmqGetPoller(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
/* Let pollable item be verbose if poller is verbose */
if (poller->verbose == true) rb_czmq_pollitem_set_verbose(pollable, Qtrue);
rb_ary_push(poller->pollables, pollable);
poller->poll_size++;
poller->dirty = true;
return pollable;
}
|
#register_readable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for readability
13 14 15 |
# File 'lib/zmq/poller.rb', line 13 def register_readable(pollable) register ZMQ::Pollitem.new(pollable, ZMQ::POLLIN) end |
#register_writable(pollable) ⇒ Object
API sugar for registering a ZMQ::Socket or IO for writability
19 20 21 |
# File 'lib/zmq/poller.rb', line 19 def register_writable(pollable) register ZMQ::Pollitem.new(pollable, ZMQ::POLLOUT) end |
#remove(pollitem) ⇒ Boolean
Removes a poll item from this poller. Deregisters the socket for any previously registered events. Note that we match on both poll items as well as pollable entities for all registered poll items.
Examples
poller = ZMQ::Poller.new => ZMQ::Poller
poller.register(req) => true
poller.remove(req) => true
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'ext/rbczmq/poller.c', line 227
VALUE rb_czmq_poller_remove(VALUE obj, VALUE pollable)
{
int pos;
VALUE rpollable;
ZmqGetPoller(obj);
pollable = rb_czmq_pollitem_coerce(pollable);
ZmqGetPollitem(pollable);
for (pos = 0; pos < poller->poll_size; pos++) {
rpollable = rb_ary_entry(poller->pollables, (long)pos);
if (pollable == rpollable || rb_czmq_pollitem_pollable(pollable) == rb_czmq_pollitem_pollable(rpollable)) {
rb_ary_delete(poller->pollables, rpollable);
poller->poll_size--;
poller->dirty = true;
return rpollable;
}
}
return Qfalse;
}
|
#verbose=(true) ⇒ nil
298 299 300 301 302 303 304 305 |
# File 'ext/rbczmq/poller.c', line 298
static VALUE rb_czmq_poller_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
ZmqGetPoller(obj);
vlevel = (level == Qtrue) ? true : false;
poller->verbose = vlevel;
return Qnil;
}
|
#writables ⇒ Array
280 281 282 283 284 |
# File 'ext/rbczmq/poller.c', line 280
VALUE rb_czmq_poller_writables(VALUE obj)
{
ZmqGetPoller(obj);
return poller->writables;
}
|