Module: ZMQ

Defined in:
lib/zmq.rb,
lib/zmq/logger.rb,
lib/zmq/version.rb,
ext/rbczmq/rbczmq_ext.c

Defined Under Namespace

Modules: DownstreamSocket, UpstreamSocket Classes: Beacon, Context, DefaultHandler, Error, Frame, Handler, Logger, Loop, Message, Monitor, Poller, Pollitem, Socket, Timer

Constant Summary collapse

VERSION =
"1.7.8"
POLLIN =
INT2NUM(ZMQ_POLLIN)
POLLOUT =
INT2NUM(ZMQ_POLLOUT)
POLLERR =
INT2NUM(ZMQ_POLLERR)
PAIR =
INT2NUM(ZMQ_PAIR)
SUB =
INT2NUM(ZMQ_SUB)
PUB =
INT2NUM(ZMQ_PUB)
REQ =
INT2NUM(ZMQ_REQ)
REP =
INT2NUM(ZMQ_REP)
DEALER =
INT2NUM(ZMQ_DEALER)
ROUTER =
INT2NUM(ZMQ_ROUTER)
PUSH =
INT2NUM(ZMQ_PUSH)
PULL =
INT2NUM(ZMQ_PULL)
XSUB =
INT2NUM(ZMQ_XSUB)
XPUB =
INT2NUM(ZMQ_XPUB)
STREAM =
INT2NUM(ZMQ_STREAM)
EFSM =
INT2NUM(EFSM)
ENOCOMPATPROTO =
INT2NUM(ENOCOMPATPROTO)
ETERM =
INT2NUM(ETERM)
EMTHREAD =
INT2NUM(EMTHREAD)
EVENT_CONNECTED =
INT2NUM(ZMQ_EVENT_CONNECTED)
EVENT_CONNECT_DELAYED =
INT2NUM(ZMQ_EVENT_CONNECT_DELAYED)
EVENT_CONNECT_RETRIED =
INT2NUM(ZMQ_EVENT_CONNECT_RETRIED)
EVENT_LISTENING =
INT2NUM(ZMQ_EVENT_LISTENING)
EVENT_BIND_FAILED =
INT2NUM(ZMQ_EVENT_BIND_FAILED)
EVENT_ACCEPTED =
INT2NUM(ZMQ_EVENT_ACCEPTED)
EVENT_ACCEPT_FAILED =
INT2NUM(ZMQ_EVENT_ACCEPT_FAILED)
EVENT_CLOSED =
INT2NUM(ZMQ_EVENT_CLOSED)
EVENT_CLOSE_FAILED =
INT2NUM(ZMQ_EVENT_CLOSE_FAILED)
EVENT_DISCONNECTED =
INT2NUM(ZMQ_EVENT_DISCONNECTED)
EVENT_ALL =
INT2NUM(ZMQ_EVENT_ALL)

Class Method Summary collapse

Class Method Details

.contextObject

Returns the ZMQ context for this process, if any



44
45
46
# File 'lib/zmq.rb', line 44

def self.context
  @__zmq_ctx_process[Process.pid]
end

.czmq_versionArray

Returns the czmq version linked against.

Examples

ZMQ.czmq_version    =>  [3,0,0]

Returns:

  • (Array)


81
82
83
84
# File 'ext/rbczmq/rbczmq_ext.c', line 81

static VALUE rb_czmq_m_czmq_version(ZMQ_UNUSED VALUE obj)
{
    return rb_ary_new3(3, INT2NUM(CZMQ_VERSION_MAJOR), INT2NUM(CZMQ_VERSION_MINOR), INT2NUM(CZMQ_VERSION_PATCH));
}

.errnoFixnum

Returns the last known ZMQ errno (if any) as a Fixnum.

Examples

ZMQ.errno    =>  0

Returns:

  • (Fixnum)


150
151
152
153
# File 'ext/rbczmq/rbczmq_ext.c', line 150

static VALUE rb_czmq_m_errno(ZMQ_UNUSED VALUE obj)
{
    return INT2NUM(zmq_errno());
}

.errorZMQ::Error

Returns the last known ZMQ error (if any) as a ZMQ::Error instance.

Examples

ZMQ.error    =>  ZMQ::Error or nil

Returns:



131
132
133
134
135
136
137
# File 'ext/rbczmq/rbczmq_ext.c', line 131

static VALUE rb_czmq_m_error(ZMQ_UNUSED VALUE obj)
{
    int err;
    err = zmq_errno();
    if (err == 0) return Qnil;
    return rb_exc_new2(rb_eZmqError, zmq_strerror(zmq_errno()));
}

.Frame(data = nil) ⇒ Object

Sugaring for creating new ZMQ frames

ZMQ::Frame(“frame”) => ZMQ::Frame



20
21
22
# File 'lib/zmq.rb', line 20

def self.Frame(data = nil)
  ZMQ::Frame.new(data)
end

.interrupted!nil

Callback for Ruby signal handlers for terminating blocking functions and the reactor loop in libczmq.

Examples

ZMQ.interrupted!    =>  nil

Returns:

  • (nil)


166
167
168
169
170
# File 'ext/rbczmq/rbczmq_ext.c', line 166

static VALUE rb_czmq_m_interrupted_bang(ZMQ_UNUSED VALUE obj)
{
    zctx_interrupted = 1;
    return Qnil;
}

.interrupted?Boolean

Returns true if the process was interrupted by signal.

Examples

ZMQ.interrupted?    =>  boolean

Returns:

  • (Boolean)


47
48
49
50
# File 'ext/rbczmq/rbczmq_ext.c', line 47

static VALUE rb_czmq_m_interrupted_p(ZMQ_UNUSED VALUE obj)
{
    return (zctx_interrupted == true) ? Qtrue : Qfalse;
}

.log("msg") ⇒ nil

Logs a timestamped message to stdout.

Examples

ZMQ.log("msg")    =>  nil # 11-12-06 21:20:55 msg

Returns:

  • (nil)


113
114
115
116
117
118
# File 'ext/rbczmq/rbczmq_ext.c', line 113

static VALUE rb_czmq_m_log(ZMQ_UNUSED VALUE obj, VALUE msg)
{
    Check_Type(msg, T_STRING);
    zclock_log(StringValueCStr(msg));
    return Qnil;
}

.loopObject

Higher level loop API.

XXX: Handle cases where context is nil



52
53
54
# File 'lib/zmq.rb', line 52

def self.loop
  @loop ||= ZMQ::Loop.new(context)
end

.Message(*parts) ⇒ Object

Sugaring for creating new ZMQ messages

ZMQ::Message(“one”, “two”, “three”) => ZMQ::Message



28
29
30
31
32
# File 'lib/zmq.rb', line 28

def self.Message(*parts)
  m = ZMQ::Message.new
  parts.each{|p| m.addstr(p) }
  m
end

.nowFixnum

Returns a current timestamp as a Fixnum

Examples

ZMQ.now    =>  1323206405148

Returns:

  • (Fixnum)


97
98
99
100
# File 'ext/rbczmq/rbczmq_ext.c', line 97

static VALUE rb_czmq_m_now(ZMQ_UNUSED VALUE obj)
{
    return INT2NUM(zclock_time());
}

.Pollitem(pollable, events = nil) ⇒ Object

Sugaring for creating new poll items

ZMQ::Pollitem(STDIN, ZMQ::POLLIN) => ZMQ::Pollitem



38
39
40
# File 'lib/zmq.rb', line 38

def self.Pollitem(pollable, events = nil)
  ZMQ::Pollitem.new(pollable, events)
end

.proxy(frontend, backend, capture = nil) ⇒ nil

Run the ZMQ proxy method echoing messages received from front end socket to back end and vice versa, copying messages to the capture socket if provided. This method does not return unless the application is interrupted with a signal.

Examples

context = ZMQ::Context.new
frontend = context.socket(ZMQ::ROUTER)
frontend.bind("tcp://127.0.0.1:5555")
backend = context.socket(ZMQ::DEALER)
backend.bind("tcp://127.0.0.1:5556")
ZMQ.proxy(frontend, backend) => -1 when interrupted

Returns:

  • (nil)

See Also:



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'ext/rbczmq/rbczmq_ext.c', line 201

static VALUE rb_czmq_m_proxy(int argc, VALUE *argv, ZMQ_UNUSED VALUE klass)
{
    zmq_sock_wrapper *sock = NULL;
    VALUE frontend, backend, capture;
    void *sockets[3];
    int rc;

    rb_scan_args(argc, argv, "21", &frontend, &backend, &capture);

    GetZmqSocket(frontend);
    sockets[0] = sock->socket;

    GetZmqSocket(backend);
    sockets[1] = sock->socket;

    if (!NIL_P(capture))
    {
        GetZmqSocket(capture);
        sockets[2] = sock->socket;
    }
    else
    {
        sockets[2] = NULL;
    }

    rc = (int)rb_thread_call_without_gvl(rb_czmq_m_proxy_nogvl, (void *)sockets, RUBY_UBF_IO, 0);

    // int result = zmq_proxy(frontend_socket, backend_socket, capture_socket);
    return INT2NUM(rc);
}

.resolverObject



66
67
68
69
70
71
# File 'lib/zmq.rb', line 66

def self.resolver
  @resolver ||= begin
    require 'resolv'
    Resolv::DNS.new
  end
end

.select(read = [], write = [], error = [], timeout = nil) ⇒ Object

API sugaring: IO.select compatible API, but for ZMQ sockets.



58
59
60
61
62
63
64
# File 'lib/zmq.rb', line 58

def self.select(read = [], write = [], error = [], timeout = nil)
  poller = ZMQ::Poller.new
  read.each{|s| poller.register_readable(s) } if read
  write.each{|s| poller.register_writable(s) } if write
  ready = poller.poll(timeout)
  [poller.readables, poller.writables, []] if ready
end

.versionArray

Returns the libzmq version linked against.

Examples

ZMQ.version    =>  [2,1,11]

Returns:

  • (Array)


63
64
65
66
67
68
# File 'ext/rbczmq/rbczmq_ext.c', line 63

static VALUE rb_czmq_m_version(ZMQ_UNUSED VALUE obj)
{
    int major, minor, patch;
    zmq_version(&major, &minor, &patch);
    return rb_ary_new3(3, INT2NUM(major), INT2NUM(minor), INT2NUM(patch));
}