Class: ZMQ::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/context.rb,
ext/rbczmq/context.c

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Context

Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :

def initialize(*args)

super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }

end


17
18
19
20
21
# File 'lib/zmq/context.rb', line 17

def initialize(*args)
  super
  trap(:INT, "DEFAULT")
  trap(:TERM, "DEFAULT")
end

Class Method Details

.ZMQ::Context.newZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context

Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.

Examples

ZMQ::Context.new    =>  ZMQ::Context
ZMQ::Context.new(1)    =>  ZMQ::Context

Overloads:


179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'ext/rbczmq/context.c', line 179

static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
    VALUE process_ctx;
    VALUE ctx_map;
    VALUE io_threads;
    zmq_ctx_wrapper *ctx = NULL;
    rb_scan_args(argc, argv, "01", &io_threads);
    ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
    process_ctx = rb_hash_aref(ctx_map, get_pid());
    if (!NIL_P(process_ctx)){
      Data_Get_Struct(process_ctx, zmq_ctx_wrapper, ctx);
      rb_raise(rb_eZmqError, "single ZMQ context per process allowed (previous context created at %s:%d)", ctx->file, ctx->line);
    }
    context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, rb_czmq_mark_ctx_gc, rb_czmq_free_ctx_gc, ctx);
    ctx->ctx = (zctx_t*)rb_thread_call_without_gvl(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
    ZmqAssertObjOnAlloc(ctx->ctx, ctx);
    ctx->flags = 0;
    ctx->pid = getpid();
    ctx->pidValue = get_pid();
    ctx->sockets = zlist_new();
    ctx->file = rb_sourcefile();
    ctx->line = rb_sourceline();
    rb_obj_call_init(context, 0, NULL);
    rb_hash_aset(ctx_map, ctx->pidValue, context);
    if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
    return context;
}

Instance Method Details

#bind(sock_type, endpoint) ⇒ Object

Sugaring for spawning a new socket and bind to a given endpoint

ctx.bind(:PUB, “tcp://127.0.0.1:5000”)


35
36
37
38
39
# File 'lib/zmq/context.rb', line 35

def bind(sock_type, endpoint)
  s = socket(sock_type)
  s.bind(endpoint)
  s
end

#connect(sock_type, endpoint) ⇒ Object

Sugaring for spawning a new socket and connect to a given endpoint

ctx.connect(:SUB, “tcp://127.0.0.1:5000”)


45
46
47
48
49
# File 'lib/zmq/context.rb', line 45

def connect(sock_type, endpoint)
  s = socket(sock_type)
  s.connect(endpoint)
  s
end

#destroynil

Destroy a ZMQ context and all sockets in it. Useful for manual memory management, otherwise the GC will take the same action if a context object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

ctx = ZMQ::Context.new
ctx.destroy     =>   nil

Returns:

  • (nil)

221
222
223
224
225
226
227
# File 'ext/rbczmq/context.c', line 221

static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    rb_czmq_free_ctx(ctx);
    return Qnil;
}

#iothreads=(2) ⇒ nil

Raises default I/O threads from 1 - there should be no need to fiddle with this.

Examples

ctx = ZMQ::Context.new
ctx.iothreads = 2    =>   nil

Returns:

  • (nil)

241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'ext/rbczmq/context.c', line 241

static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
    int iothreads;
    errno = 0;
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    Check_Type(threads, T_FIXNUM);
    iothreads = FIX2INT(threads);
    if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
    zctx_set_iothreads(ctx->ctx, iothreads);
    if (zmq_errno() == EINVAL) ZmqRaiseSysError();
    return Qnil;
}

#linger=(100) ⇒ nil

Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.

Examples

ctx = ZMQ::Context.new
ctx.linger = 100    =>   nil

Returns:

  • (nil)

269
270
271
272
273
274
275
276
277
278
279
280
# File 'ext/rbczmq/context.c', line 269

static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
    errno = 0;
    int msecs;
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    Check_Type(linger, T_FIXNUM);
    msecs = FIX2INT(linger);
    if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
    zctx_set_linger(ctx->ctx, msecs);
    return Qnil;
}

#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket

Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they're always spawned off a context. Sockets also track state of the current Ruby thread they're created in to ensure they always only ever do work on the thread they were spawned on.

Examples

ctx = ZMQ::Context.new
ctx.socket(:PUSH)    =>   ZMQ::Socket
ctx.socket(ZMQ::PUSH)    =>  ZMQ::Socket

Overloads:


377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
# File 'ext/rbczmq/context.c', line 377

static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
    int socket_type;
    struct nogvl_socket_args args;
    errno = 0;
    void *socket;
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
    socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);

    args.ctx = ctx->ctx;
    args.type = socket_type;
    socket = rb_thread_call_without_gvl(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0);
    VALUE socket_object = rb_czmq_socket_alloc(obj, ctx->ctx, socket);
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(socket_object);
    zlist_push(ctx->sockets, sock);
    return socket_object;
}