Class: ZMQ::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/context.rb,
ext/rbczmq/context.c

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Context

Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :

def initialize(*args)

super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }

end



17
18
19
20
21
# File 'lib/zmq/context.rb', line 17

def initialize(*args)
  super
  trap(:INT, "DEFAULT")
  trap(:TERM, "DEFAULT")
end

Class Method Details

.ZMQ::Context.newZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context

Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.

Examples

ZMQ::Context.new    =>  ZMQ::Context
ZMQ::Context.new(1)    =>  ZMQ::Context

Overloads:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'ext/rbczmq/context.c', line 85

static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
    VALUE ctx_map;
    VALUE io_threads;
    zmq_ctx_wrapper *ctx = NULL;
    rb_scan_args(argc, argv, "01", &io_threads);
    ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
    if (!NIL_P(rb_hash_aref(ctx_map, get_pid()))) rb_raise(rb_eZmqError, "single ZMQ context per process allowed");
    context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, 0, rb_czmq_free_ctx_gc, ctx);
    ctx->ctx = (zctx_t*)rb_thread_blocking_region(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
    ZmqAssertObjOnAlloc(ctx->ctx, ctx);
    ctx->flags = 0;
    rb_obj_call_init(context, 0, NULL);
    rb_hash_aset(ctx_map, get_pid(), context);
    if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
    return context;
}

Instance Method Details

#bind(sock_type, endpoint) ⇒ Object

Sugaring for spawning a new socket and bind to a given endpoint

ctx.bind(:PUB, “tcp://127.0.0.1:5000”)



35
36
37
38
39
# File 'lib/zmq/context.rb', line 35

def bind(sock_type, endpoint)
  s = socket(sock_type)
  s.bind(endpoint)
  s
end

#connect(sock_type, endpoint) ⇒ Object

Sugaring for spawning a new socket and connect to a given endpoint

ctx.connect(:SUB, “tcp://127.0.0.1:5000”)



45
46
47
48
49
# File 'lib/zmq/context.rb', line 45

def connect(sock_type, endpoint)
  s = socket(sock_type)
  s.connect(endpoint)
  s
end

#destroynil

Destroy a ZMQ context and all sockets in it. Useful for manual memory management, otherwise the GC will take the same action if a context object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

ctx = ZMQ::Context.new
ctx.destroy     =>   nil

Returns:

  • (nil)


117
118
119
120
121
122
# File 'ext/rbczmq/context.c', line 117

static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
    ZmqGetContext(obj);
    rb_czmq_free_ctx(ctx);
    return Qnil;
}

#hwmFixnum

Returns High Water Mark (HWM) option used for native thread creation (non-Ruby threads, ahead of time API addition)

Examples

ctx = ZMQ::Context.new
ctx.hwm    =>   1

Returns:

  • (Fixnum)


188
189
190
191
192
193
194
195
# File 'ext/rbczmq/context.c', line 188

static VALUE rb_czmq_ctx_hwm(VALUE obj)
{
    errno = 0;
    int wm;
    ZmqGetContext(obj);
    rb_warn("Deprecated method ZMQ::Context#hwm, does nothing - to be removed after 2013/05/14");
    return INT2FIX(zctx_hwm(ctx->ctx));
}

#hwm=(100) ⇒ nil

Sets the High Water Mark (HWM) option used for native thread creation (non-Ruby threads, ahead of time API addition)

Examples

ctx = ZMQ::Context.new
ctx.hwm = 100    =>   nil

Returns:

  • (nil)


209
210
211
212
213
214
215
216
217
218
219
220
# File 'ext/rbczmq/context.c', line 209

static VALUE rb_czmq_ctx_set_hwm(VALUE obj, VALUE hwm)
{
    errno = 0;
    int wm;
    ZmqGetContext(obj);
    Check_Type(hwm, T_FIXNUM);
    rb_warn("Deprecated method ZMQ::Context#hwm=, does nothing - to be removed after 2013/05/14");
    wm = FIX2INT(hwm);
    if (wm < 0) rb_raise(rb_eZmqError, "negative HWM values is not supported.");
    zctx_set_hwm(ctx->ctx, wm);
    return Qnil;
}

#iothreads=(2) ⇒ nil

Raises default I/O threads from 1 - there should be no need to fiddle with this.

Examples

ctx = ZMQ::Context.new
ctx.iothreads = 2    =>   nil

Returns:

  • (nil)


136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'ext/rbczmq/context.c', line 136

static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
    int iothreads;
    errno = 0;
    ZmqGetContext(obj);
    Check_Type(threads, T_FIXNUM);
    iothreads = FIX2INT(threads);
    if (iothreads > 1) rb_warn("You probably don't want to spawn more than 1 I/O thread per ZMQ context.");
    if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
    zctx_set_iothreads(ctx->ctx, iothreads);
    if (zmq_errno() == EINVAL) ZmqRaiseSysError();
    return Qnil;
}

#linger=(100) ⇒ nil

Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.

Examples

ctx = ZMQ::Context.new
ctx.linger = 100    =>   nil

Returns:

  • (nil)


164
165
166
167
168
169
170
171
172
173
174
# File 'ext/rbczmq/context.c', line 164

static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
    errno = 0;
    int msecs;
    ZmqGetContext(obj);
    Check_Type(linger, T_FIXNUM);
    msecs = FIX2INT(linger);
    if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
    zctx_set_linger(ctx->ctx, msecs);
    return Qnil;
}

#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket

Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they’re always spawned off a context. Sockets also track state of the current Ruby thread they’re created in to ensure they always only ever do work on the thread they were spawned on.

Examples

ctx = ZMQ::Context.new
ctx.socket(:PUSH)    =>   ZMQ::Socket
ctx.socket(ZMQ::PUSH)    =>  ZMQ::Socket

Overloads:



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'ext/rbczmq/context.c', line 281

static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
    VALUE socket;
    int socket_type;
    struct nogvl_socket_args args;
    zmq_sock_wrapper *sock = NULL;
    errno = 0;
    ZmqGetContext(obj);
    if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
    socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);

    socket = Data_Make_Struct(rb_czmq_ctx_socket_klass(socket_type), zmq_sock_wrapper, rb_czmq_mark_sock, rb_czmq_free_sock_gc, sock);
    args.ctx = ctx->ctx;
    args.type = socket_type;
    sock->socket = (void*)rb_thread_blocking_region(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssertObjOnAlloc(sock->socket, sock);
#ifndef HAVE_RB_THREAD_BLOCKING_REGION
    sock->str_buffer = zlist_new();
    sock->frame_buffer = zlist_new();
    sock->msg_buffer = zlist_new();
#endif
    sock->flags = 0;
    sock->ctx = ctx->ctx;
    sock->verbose = false;
    sock->state = ZMQ_SOCKET_PENDING;
    sock->endpoints = rb_ary_new();
    sock->thread = rb_thread_current();
    sock->context = obj;
    sock->monitor_endpoint = Qnil;
    sock->monitor_handler = Qnil;
    sock->monitor_thread = Qnil;
    rb_obj_call_init(socket, 0, NULL);
    return socket;
}