Class: ZMQ::Context
- Inherits:
-
Object
- Object
- ZMQ::Context
- Defined in:
- lib/zmq/context.rb,
ext/rbczmq/context.c
Class Method Summary collapse
-
.new(*args) ⇒ Object
Returns a handle to a new ZMQ context.
Instance Method Summary collapse
-
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint.
-
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint.
-
#destroy ⇒ nil
Destroy a ZMQ context and all sockets in it.
-
#initialize(*args) ⇒ Context
constructor
Overload the libczmq handler installed for SIGINT and SIGTERM on context init.
-
#iothreads=(2) ⇒ nil
Raises default I/O threads from 1 - there should be no need to fiddle with this.
-
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them.
-
#socket(type) ⇒ Object
Creates a socket within this ZMQ context.
Constructor Details
#initialize(*args) ⇒ Context
Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :
def initialize(*args)
super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }
end
17 18 19 20 21 |
# File 'lib/zmq/context.rb', line 17 def initialize(*args) super trap(:INT, "DEFAULT") trap(:TERM, "DEFAULT") end |
Class Method Details
.ZMQ::Context.new ⇒ ZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context
Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.
Examples
ZMQ::Context.new => ZMQ::Context
ZMQ::Context.new(1) => ZMQ::Context
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'ext/rbczmq/context.c', line 85
static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
VALUE ctx_map;
VALUE io_threads;
zmq_ctx_wrapper *ctx = NULL;
rb_scan_args(argc, argv, "01", &io_threads);
ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
if (!NIL_P(rb_hash_aref(ctx_map, get_pid()))) rb_raise(rb_eZmqError, "single ZMQ context per process allowed");
context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, 0, rb_czmq_free_ctx_gc, ctx);
ctx->ctx = (zctx_t*)rb_thread_blocking_region(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(ctx->ctx, ctx);
ctx->flags = 0;
rb_obj_call_init(context, 0, NULL);
rb_hash_aset(ctx_map, get_pid(), context);
if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
return context;
}
|
Instance Method Details
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint
ctx.bind(:PUB, “tcp://127.0.0.1:5000”)
35 36 37 38 39 |
# File 'lib/zmq/context.rb', line 35 def bind(sock_type, endpoint) s = socket(sock_type) s.bind(endpoint) s end |
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint
ctx.connect(:SUB, “tcp://127.0.0.1:5000”)
45 46 47 48 49 |
# File 'lib/zmq/context.rb', line 45 def connect(sock_type, endpoint) s = socket(sock_type) s.connect(endpoint) s end |
#destroy ⇒ nil
117 118 119 120 121 122 |
# File 'ext/rbczmq/context.c', line 117
static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
ZmqGetContext(obj);
rb_czmq_free_ctx(ctx);
return Qnil;
}
|
#iothreads=(2) ⇒ nil
136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'ext/rbczmq/context.c', line 136
static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
int iothreads;
errno = 0;
ZmqGetContext(obj);
Check_Type(threads, T_FIXNUM);
iothreads = FIX2INT(threads);
if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
zctx_set_iothreads(ctx->ctx, iothreads);
if (zmq_errno() == EINVAL) ZmqRaiseSysError();
return Qnil;
}
|
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.
Examples
ctx = ZMQ::Context.new
ctx.linger = 100 => nil
163 164 165 166 167 168 169 170 171 172 173 |
# File 'ext/rbczmq/context.c', line 163
static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
errno = 0;
int msecs;
ZmqGetContext(obj);
Check_Type(linger, T_FIXNUM);
msecs = FIX2INT(linger);
if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
zctx_set_linger(ctx->ctx, msecs);
return Qnil;
}
|
#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket
Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they’re always spawned off a context. Sockets also track state of the current Ruby thread they’re created in to ensure they always only ever do work on the thread they were spawned on.
Examples
ctx = ZMQ::Context.new
ctx.socket(:PUSH) => ZMQ::Socket
ctx.socket(ZMQ::PUSH) => ZMQ::Socket
259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'ext/rbczmq/context.c', line 259
static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
int socket_type;
struct nogvl_socket_args args;
errno = 0;
ZmqGetContext(obj);
if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);
args.ctx = ctx->ctx;
args.type = socket_type;
return rb_czmq_socket_alloc(obj, ctx->ctx, (void*)rb_thread_blocking_region(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0));
}
|