Module: ZMQ

Defined in:
lib/zmq.rb,
lib/zmq/version.rb,
ext/rbczmq/rbczmq_ext.c

Defined Under Namespace

Modules: DownstreamSocket, UpstreamSocket Classes: Context, DefaultHandler, Error, Frame, Handler, Loop, Message, Monitor, Poller, Pollitem, Socket, Timer

Constant Summary collapse

VERSION =
"1.6"
POLLIN =
INT2NUM(ZMQ_POLLIN)
POLLOUT =
INT2NUM(ZMQ_POLLOUT)
POLLERR =
INT2NUM(ZMQ_POLLERR)
PAIR =
INT2NUM(ZMQ_PAIR)
SUB =
INT2NUM(ZMQ_SUB)
PUB =
INT2NUM(ZMQ_PUB)
REQ =
INT2NUM(ZMQ_REQ)
REP =
INT2NUM(ZMQ_REP)
DEALER =
INT2NUM(ZMQ_DEALER)
ROUTER =
INT2NUM(ZMQ_ROUTER)
PUSH =
INT2NUM(ZMQ_PUSH)
PULL =
INT2NUM(ZMQ_PULL)
XSUB =
INT2NUM(ZMQ_XSUB)
XPUB =
INT2NUM(ZMQ_XPUB)
EFSM =
INT2NUM(EFSM)
ENOCOMPATPROTO =
INT2NUM(ENOCOMPATPROTO)
ETERM =
INT2NUM(ETERM)
EMTHREAD =
INT2NUM(EMTHREAD)
EVENT_CONNECTED =
INT2NUM(ZMQ_EVENT_CONNECTED)
EVENT_CONNECT_DELAYED =
INT2NUM(ZMQ_EVENT_CONNECT_DELAYED)
EVENT_CONNECT_RETRIED =
INT2NUM(ZMQ_EVENT_CONNECT_RETRIED)
EVENT_LISTENING =
INT2NUM(ZMQ_EVENT_LISTENING)
EVENT_BIND_FAILED =
INT2NUM(ZMQ_EVENT_BIND_FAILED)
EVENT_ACCEPTED =
INT2NUM(ZMQ_EVENT_ACCEPTED)
EVENT_ACCEPT_FAILED =
INT2NUM(ZMQ_EVENT_ACCEPT_FAILED)
EVENT_CLOSED =
INT2NUM(ZMQ_EVENT_CLOSED)
EVENT_CLOSE_FAILED =
INT2NUM(ZMQ_EVENT_CLOSE_FAILED)
EVENT_DISCONNECTED =
INT2NUM(ZMQ_EVENT_DISCONNECTED)
EVENT_ALL =
INT2NUM(ZMQ_EVENT_ALL)

Class Method Summary collapse

Class Method Details

.contextObject

Returns the ZMQ context for this process, if any



58
59
60
# File 'lib/zmq.rb', line 58

def self.context
  @__zmq_ctx_process[Process.pid]
end

.errnoFixnum

Returns the last known ZMQ errno (if any) as a Fixnum.

Examples

ZMQ.errno    =>  0

Returns:

  • (Fixnum)


137
138
139
140
# File 'ext/rbczmq/rbczmq_ext.c', line 137

static VALUE rb_czmq_m_errno(ZMQ_UNUSED VALUE obj)
{
    return INT2NUM(zmq_errno());
}

.errorZMQ::Error

Returns the last known ZMQ error (if any) as a ZMQ::Error instance.

Examples

ZMQ.error    =>  ZMQ::Error or nil

Returns:



118
119
120
121
122
123
124
# File 'ext/rbczmq/rbczmq_ext.c', line 118

static VALUE rb_czmq_m_error(ZMQ_UNUSED VALUE obj)
{
    int err;
    err = zmq_errno();
    if (err == 0) return Qnil;
    return rb_exc_new2(rb_eZmqError, zmq_strerror(zmq_errno()));
}

.Frame(data = nil) ⇒ Object

Sugaring for creating new ZMQ frames

ZMQ::Frame(“frame”) => ZMQ::Frame



34
35
36
# File 'lib/zmq.rb', line 34

def self.Frame(data = nil)
  ZMQ::Frame.new(data)
end

.interrupted!nil

Callback for Ruby signal handlers for terminating blocking functions and the reactor loop in libczmq.

Examples

ZMQ.interrupted!    =>  nil

Returns:

  • (nil)


153
154
155
156
157
# File 'ext/rbczmq/rbczmq_ext.c', line 153

static VALUE rb_czmq_m_interrupted_bang(ZMQ_UNUSED VALUE obj)
{
    zctx_interrupted = 1;
    return Qnil;
}

.interrupted?Boolean

Returns true if the process was interrupted by signal.

Examples

ZMQ.interrupted?    =>  boolean

Returns:

  • (Boolean)


50
51
52
53
# File 'ext/rbczmq/rbczmq_ext.c', line 50

static VALUE rb_czmq_m_interrupted_p(ZMQ_UNUSED VALUE obj)
{
    return (zctx_interrupted == true) ? Qtrue : Qfalse;
}

.log("msg") ⇒ nil

Logs a timestamped message to stdout.

Examples

ZMQ.log("msg")    =>  nil # 11-12-06 21:20:55 msg

Returns:

  • (nil)


100
101
102
103
104
105
# File 'ext/rbczmq/rbczmq_ext.c', line 100

static VALUE rb_czmq_m_log(ZMQ_UNUSED VALUE obj, VALUE msg)
{
    Check_Type(msg, T_STRING);
    zclock_log(StringValueCStr(msg));
    return Qnil;
}

.loopObject

Higher level loop API.

XXX: Handle cases where context is nil



66
67
68
# File 'lib/zmq.rb', line 66

def self.loop
  @loop ||= ZMQ::Loop.new(context)
end

.Message(*parts) ⇒ Object

Sugaring for creating new ZMQ messages

ZMQ::Message(“one”, “two”, “three”) => ZMQ::Message



42
43
44
45
46
# File 'lib/zmq.rb', line 42

def self.Message(*parts)
  m = ZMQ::Message.new
  parts.each{|p| m.addstr(p) }
  m
end

.nowFixnum

Returns a current timestamp as a Fixnum

Examples

ZMQ.now    =>  1323206405148

Returns:

  • (Fixnum)


84
85
86
87
# File 'ext/rbczmq/rbczmq_ext.c', line 84

static VALUE rb_czmq_m_now(ZMQ_UNUSED VALUE obj)
{
    return INT2NUM(zclock_time());
}

.Pollitem(pollable, events = nil) ⇒ Object

Sugaring for creating new poll items

ZMQ::Pollitem(STDIN, ZMQ::POLLIN) => ZMQ::Pollitem



52
53
54
# File 'lib/zmq.rb', line 52

def self.Pollitem(pollable, events = nil)
  ZMQ::Pollitem.new(pollable, events)
end

.proxy(frontend, backend, capture = nil) ⇒ nil

Run the ZMQ proxy method echoing messages received from front end socket to back end and vice versa, copying messages to the capture socket if provided. This method does not return unless the application is interrupted with a signal.

Examples

context = ZMQ::Context.new
frontend = context.socket(ZMQ::ROUTER)
frontend.bind("tcp://*:5555")
backend = context.socket(ZMQ::DEALER)
backend.bind("tcp://*:5556")
ZMQ.proxy(frontend, backend) => -1 when interrupted

Returns:

  • (nil)

See Also:



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'ext/rbczmq/rbczmq_ext.c', line 188

static VALUE rb_czmq_m_proxy(int argc, VALUE *argv, ZMQ_UNUSED VALUE klass)
{
    zmq_sock_wrapper *sock = NULL;
    VALUE frontend, backend, capture;
    void *sockets[3];
    int rc;

    rb_scan_args(argc, argv, "21", &frontend, &backend, &capture);

    GetZmqSocket(frontend);
    sockets[0] = sock->socket;

    GetZmqSocket(backend);
    sockets[1] = sock->socket;

    if (!NIL_P(capture))
    {
        GetZmqSocket(capture);
        sockets[2] = sock->socket;
    }
    else
    {
        sockets[2] = NULL;
    }

    rc = (int)rb_thread_blocking_region(rb_czmq_m_proxy_nogvl, (void *)sockets, RUBY_UBF_IO, 0);

    // int result = zmq_proxy(frontend_socket, backend_socket, capture_socket);
    return INT2NUM(rc);
}

.resolverObject



80
81
82
83
84
85
# File 'lib/zmq.rb', line 80

def self.resolver
  @resolver ||= begin
    require 'resolv'
    Resolv::DNS.new
  end
end

.select(read = [], write = [], error = [], timeout = nil) ⇒ Object

API sugaring: IO.select compatible API, but for ZMQ sockets.



72
73
74
75
76
77
78
# File 'lib/zmq.rb', line 72

def self.select(read = [], write = [], error = [], timeout = nil)
  poller = ZMQ::Poller.new
  read.each{|s| poller.register_readable(s) } if read
  write.each{|s| poller.register_writable(s) } if write
  ready = poller.poll(timeout)
  [poller.readables, poller.writables, []] if ready
end

.stable_version?Boolean

Returns:

  • (Boolean)


20
21
22
23
24
# File 'lib/zmq.rb', line 20

def self.stable_version?
  version[0] == 3 &&
  version[1] == 2 &&
  version[2] == 2
end

.versionArray

Returns the libzmq version linked against.

Examples

ZMQ.version    =>  [2,1,11]

Returns:

  • (Array)


66
67
68
69
70
71
# File 'ext/rbczmq/rbczmq_ext.c', line 66

static VALUE rb_czmq_m_version(ZMQ_UNUSED VALUE obj)
{
    int major, minor, patch;
    zmq_version(&major, &minor, &patch);
    return rb_ary_new3(3, INT2NUM(major), INT2NUM(minor), INT2NUM(patch));
}

.version2?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/zmq.rb', line 26

def self.version2?
  !version3?
end

.version3?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/zmq.rb', line 16

def self.version3?
  version[0] == 3
end