Class: ZMQ::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/context.rb,
ext/rbczmq/context.c

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Context

Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :

def initialize(*args)

super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }

end



17
18
19
20
21
# File 'lib/zmq/context.rb', line 17

def initialize(*args)
  super
  trap(:INT, "DEFAULT")
  trap(:TERM, "DEFAULT")
end

Class Method Details

.ZMQ::Context.newZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context

Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.

Examples

ZMQ::Context.new    =>  ZMQ::Context
ZMQ::Context.new(1)    =>  ZMQ::Context

Overloads:



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'ext/rbczmq/context.c', line 88

static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
    VALUE ctx_map;
    VALUE io_threads;
    zmq_ctx_wrapper *ctx = NULL;
    rb_scan_args(argc, argv, "01", &io_threads);
    ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
    if (!NIL_P(rb_hash_aref(ctx_map, get_pid()))) rb_raise(rb_eZmqError, "single ZMQ context per process allowed");
    context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, 0, rb_czmq_free_ctx_gc, ctx);
    ctx->ctx = (zctx_t*)rb_thread_blocking_region(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
    ZmqAssertObjOnAlloc(ctx->ctx, ctx);
    ctx->flags = 0;
    ctx->pid = getpid();
    ctx->pidValue = get_pid();
    rb_obj_call_init(context, 0, NULL);
    rb_hash_aset(ctx_map, ctx->pidValue, context);
    if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
    return context;
}

Instance Method Details

#bind(sock_type, endpoint) ⇒ Object

Sugaring for spawning a new socket and bind to a given endpoint

ctx.bind(:PUB, “tcp://127.0.0.1:5000”)



35
36
37
38
39
# File 'lib/zmq/context.rb', line 35

def bind(sock_type, endpoint)
  s = socket(sock_type)
  s.bind(endpoint)
  s
end

#connect(sock_type, endpoint) ⇒ Object

Sugaring for spawning a new socket and connect to a given endpoint

ctx.connect(:SUB, “tcp://127.0.0.1:5000”)



45
46
47
48
49
# File 'lib/zmq/context.rb', line 45

def connect(sock_type, endpoint)
  s = socket(sock_type)
  s.connect(endpoint)
  s
end

#destroynil

Destroy a ZMQ context and all sockets in it. Useful for manual memory management, otherwise the GC will take the same action if a context object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

ctx = ZMQ::Context.new
ctx.destroy     =>   nil

Returns:

  • (nil)


122
123
124
125
126
127
128
# File 'ext/rbczmq/context.c', line 122

static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    rb_czmq_free_ctx(ctx);
    return Qnil;
}

#iothreads=(2) ⇒ nil

Raises default I/O threads from 1 - there should be no need to fiddle with this.

Examples

ctx = ZMQ::Context.new
ctx.iothreads = 2    =>   nil

Returns:

  • (nil)


142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'ext/rbczmq/context.c', line 142

static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
    int iothreads;
    errno = 0;
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    Check_Type(threads, T_FIXNUM);
    iothreads = FIX2INT(threads);
    if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
    zctx_set_iothreads(ctx->ctx, iothreads);
    if (zmq_errno() == EINVAL) ZmqRaiseSysError();
    return Qnil;
}

#linger=(100) ⇒ nil

Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.

Examples

ctx = ZMQ::Context.new
ctx.linger = 100    =>   nil

Returns:

  • (nil)


170
171
172
173
174
175
176
177
178
179
180
181
# File 'ext/rbczmq/context.c', line 170

static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
    errno = 0;
    int msecs;
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    Check_Type(linger, T_FIXNUM);
    msecs = FIX2INT(linger);
    if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
    zctx_set_linger(ctx->ctx, msecs);
    return Qnil;
}

#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket

Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they’re always spawned off a context. Sockets also track state of the current Ruby thread they’re created in to ensure they always only ever do work on the thread they were spawned on.

Examples

ctx = ZMQ::Context.new
ctx.socket(:PUSH)    =>   ZMQ::Socket
ctx.socket(ZMQ::PUSH)    =>  ZMQ::Socket

Overloads:



267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'ext/rbczmq/context.c', line 267

static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
    int socket_type;
    struct nogvl_socket_args args;
    errno = 0;
    ZmqGetContext(obj);
    ZmqAssertContextPidMatches(ctx);
    if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
    socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);

    args.ctx = ctx->ctx;
    args.type = socket_type;
    return rb_czmq_socket_alloc(obj, ctx->ctx, (void*)rb_thread_blocking_region(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0));
}