Module: ZMQ
- Defined in:
- lib/zmq.rb,
lib/zmq/logger.rb,
lib/zmq/version.rb,
ext/rbczmq/rbczmq_ext.c
Defined Under Namespace
Modules: DownstreamSocket, UpstreamSocket Classes: Beacon, Context, DefaultHandler, Error, Frame, Handler, Logger, Loop, Message, Monitor, Poller, Pollitem, Socket, Timer
Constant Summary collapse
- VERSION =
"1.7.4"
- POLLIN =
INT2NUM(ZMQ_POLLIN)
- POLLOUT =
INT2NUM(ZMQ_POLLOUT)
- POLLERR =
INT2NUM(ZMQ_POLLERR)
- PAIR =
INT2NUM(ZMQ_PAIR)
- SUB =
INT2NUM(ZMQ_SUB)
- PUB =
INT2NUM(ZMQ_PUB)
- REQ =
INT2NUM(ZMQ_REQ)
- REP =
INT2NUM(ZMQ_REP)
- DEALER =
INT2NUM(ZMQ_DEALER)
- ROUTER =
INT2NUM(ZMQ_ROUTER)
- PUSH =
INT2NUM(ZMQ_PUSH)
- PULL =
INT2NUM(ZMQ_PULL)
- XSUB =
INT2NUM(ZMQ_XSUB)
- XPUB =
INT2NUM(ZMQ_XPUB)
- EFSM =
INT2NUM(EFSM)
- ENOCOMPATPROTO =
INT2NUM(ENOCOMPATPROTO)
- ETERM =
INT2NUM(ETERM)
- EMTHREAD =
INT2NUM(EMTHREAD)
- EVENT_CONNECTED =
INT2NUM(ZMQ_EVENT_CONNECTED)
- EVENT_CONNECT_DELAYED =
INT2NUM(ZMQ_EVENT_CONNECT_DELAYED)
- EVENT_CONNECT_RETRIED =
INT2NUM(ZMQ_EVENT_CONNECT_RETRIED)
- EVENT_LISTENING =
INT2NUM(ZMQ_EVENT_LISTENING)
- EVENT_BIND_FAILED =
INT2NUM(ZMQ_EVENT_BIND_FAILED)
- EVENT_ACCEPTED =
INT2NUM(ZMQ_EVENT_ACCEPTED)
- EVENT_ACCEPT_FAILED =
INT2NUM(ZMQ_EVENT_ACCEPT_FAILED)
- EVENT_CLOSED =
INT2NUM(ZMQ_EVENT_CLOSED)
- EVENT_CLOSE_FAILED =
INT2NUM(ZMQ_EVENT_CLOSE_FAILED)
- EVENT_DISCONNECTED =
INT2NUM(ZMQ_EVENT_DISCONNECTED)
- EVENT_ALL =
INT2NUM(ZMQ_EVENT_ALL)
Class Method Summary collapse
-
.context ⇒ Object
Returns the ZMQ context for this process, if any.
-
.errno ⇒ Fixnum
Returns the last known ZMQ errno (if any) as a Fixnum.
-
.error ⇒ ZMQ::Error
Returns the last known ZMQ error (if any) as a ZMQ::Error instance.
-
.Frame(data = nil) ⇒ Object
Sugaring for creating new ZMQ frames.
-
.interrupted! ⇒ nil
Callback for Ruby signal handlers for terminating blocking functions and the reactor loop in libczmq.
-
.interrupted? ⇒ Boolean
Returns true if the process was interrupted by signal.
-
.log("msg") ⇒ nil
Logs a timestamped message to stdout.
-
.loop ⇒ Object
Higher level loop API.
-
.Message(*parts) ⇒ Object
Sugaring for creating new ZMQ messages.
-
.now ⇒ Fixnum
Returns a current timestamp as a Fixnum.
-
.Pollitem(pollable, events = nil) ⇒ Object
Sugaring for creating new poll items.
-
.proxy(frontend, backend, capture = nil) ⇒ nil
Run the ZMQ proxy method echoing messages received from front end socket to back end and vice versa, copying messages to the capture socket if provided.
- .resolver ⇒ Object
-
.select(read = [], write = [], error = [], timeout = nil) ⇒ Object
API sugaring: IO.select compatible API, but for ZMQ sockets.
-
.version ⇒ Array
Returns the libzmq version linked against.
Class Method Details
.context ⇒ Object
Returns the ZMQ context for this process, if any
44 45 46 |
# File 'lib/zmq.rb', line 44 def self.context @__zmq_ctx_process[Process.pid] end |
.errno ⇒ Fixnum
133 134 135 136 |
# File 'ext/rbczmq/rbczmq_ext.c', line 133
static VALUE rb_czmq_m_errno(ZMQ_UNUSED VALUE obj)
{
return INT2NUM(zmq_errno());
}
|
.error ⇒ ZMQ::Error
114 115 116 117 118 119 120 |
# File 'ext/rbczmq/rbczmq_ext.c', line 114
static VALUE rb_czmq_m_error(ZMQ_UNUSED VALUE obj)
{
int err;
err = zmq_errno();
if (err == 0) return Qnil;
return rb_exc_new2(rb_eZmqError, zmq_strerror(zmq_errno()));
}
|
.Frame(data = nil) ⇒ Object
Sugaring for creating new ZMQ frames
ZMQ::Frame(“frame”) => ZMQ::Frame
20 21 22 |
# File 'lib/zmq.rb', line 20 def self.Frame(data = nil) ZMQ::Frame.new(data) end |
.interrupted! ⇒ nil
Callback for Ruby signal handlers for terminating blocking functions and the reactor loop in libczmq.
Examples
ZMQ.interrupted! => nil
149 150 151 152 153 |
# File 'ext/rbczmq/rbczmq_ext.c', line 149
static VALUE rb_czmq_m_interrupted_bang(ZMQ_UNUSED VALUE obj)
{
zctx_interrupted = 1;
return Qnil;
}
|
.interrupted? ⇒ Boolean
46 47 48 49 |
# File 'ext/rbczmq/rbczmq_ext.c', line 46
static VALUE rb_czmq_m_interrupted_p(ZMQ_UNUSED VALUE obj)
{
return (zctx_interrupted == true) ? Qtrue : Qfalse;
}
|
.log("msg") ⇒ nil
96 97 98 99 100 101 |
# File 'ext/rbczmq/rbczmq_ext.c', line 96
static VALUE rb_czmq_m_log(ZMQ_UNUSED VALUE obj, VALUE msg)
{
Check_Type(msg, T_STRING);
zclock_log(StringValueCStr(msg));
return Qnil;
}
|
.loop ⇒ Object
Higher level loop API.
XXX: Handle cases where context is nil
52 53 54 |
# File 'lib/zmq.rb', line 52 def self.loop @loop ||= ZMQ::Loop.new(context) end |
.Message(*parts) ⇒ Object
Sugaring for creating new ZMQ messages
ZMQ::Message(“one”, “two”, “three”) => ZMQ::Message
28 29 30 31 32 |
# File 'lib/zmq.rb', line 28 def self.Message(*parts) m = ZMQ::Message.new parts.each{|p| m.addstr(p) } m end |
.now ⇒ Fixnum
80 81 82 83 |
# File 'ext/rbczmq/rbczmq_ext.c', line 80
static VALUE rb_czmq_m_now(ZMQ_UNUSED VALUE obj)
{
return INT2NUM(zclock_time());
}
|
.Pollitem(pollable, events = nil) ⇒ Object
Sugaring for creating new poll items
ZMQ::Pollitem(STDIN, ZMQ::POLLIN) => ZMQ::Pollitem
38 39 40 |
# File 'lib/zmq.rb', line 38 def self.Pollitem(pollable, events = nil) ZMQ::Pollitem.new(pollable, events) end |
.proxy(frontend, backend, capture = nil) ⇒ nil
Run the ZMQ proxy method echoing messages received from front end socket to back end and vice versa, copying messages to the capture socket if provided. This method does not return unless the application is interrupted with a signal.
Examples
context = ZMQ::Context.new
frontend = context.socket(ZMQ::ROUTER)
frontend.bind("tcp://127.0.0.1:5555")
backend = context.socket(ZMQ::DEALER)
backend.bind("tcp://127.0.0.1:5556")
ZMQ.proxy(frontend, backend) => -1 when interrupted
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'ext/rbczmq/rbczmq_ext.c', line 184
static VALUE rb_czmq_m_proxy(int argc, VALUE *argv, ZMQ_UNUSED VALUE klass)
{
zmq_sock_wrapper *sock = NULL;
VALUE frontend, backend, capture;
void *sockets[3];
int rc;
rb_scan_args(argc, argv, "21", &frontend, &backend, &capture);
GetZmqSocket(frontend);
sockets[0] = sock->socket;
GetZmqSocket(backend);
sockets[1] = sock->socket;
if (!NIL_P(capture))
{
GetZmqSocket(capture);
sockets[2] = sock->socket;
}
else
{
sockets[2] = NULL;
}
rc = (int)rb_thread_blocking_region(rb_czmq_m_proxy_nogvl, (void *)sockets, RUBY_UBF_IO, 0);
// int result = zmq_proxy(frontend_socket, backend_socket, capture_socket);
return INT2NUM(rc);
}
|
.resolver ⇒ Object
66 67 68 69 70 71 |
# File 'lib/zmq.rb', line 66 def self.resolver @resolver ||= begin require 'resolv' Resolv::DNS.new end end |
.select(read = [], write = [], error = [], timeout = nil) ⇒ Object
API sugaring: IO.select compatible API, but for ZMQ sockets.
58 59 60 61 62 63 64 |
# File 'lib/zmq.rb', line 58 def self.select(read = [], write = [], error = [], timeout = nil) poller = ZMQ::Poller.new read.each{|s| poller.register_readable(s) } if read write.each{|s| poller.register_writable(s) } if write ready = poller.poll(timeout) [poller.readables, poller.writables, []] if ready end |
.version ⇒ Array
62 63 64 65 66 67 |
# File 'ext/rbczmq/rbczmq_ext.c', line 62
static VALUE rb_czmq_m_version(ZMQ_UNUSED VALUE obj)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
return rb_ary_new3(3, INT2NUM(major), INT2NUM(minor), INT2NUM(patch));
}
|