Class: ZMQ::Context
- Inherits:
-
Object
- Object
- ZMQ::Context
- Defined in:
- lib/zmq/context.rb,
ext/rbczmq/context.c
Class Method Summary collapse
-
.new(*args) ⇒ Object
Returns a handle to a new ZMQ context.
Instance Method Summary collapse
-
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint.
-
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint.
-
#destroy ⇒ nil
Destroy a ZMQ context and all sockets in it.
-
#initialize(*args) ⇒ Context
constructor
Overload the libczmq handler installed for SIGINT and SIGTERM on context init.
-
#iothreads=(2) ⇒ nil
Raises default I/O threads from 1 - there should be no need to fiddle with this.
-
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them.
-
#socket(type) ⇒ Object
Creates a socket within this ZMQ context.
Constructor Details
#initialize(*args) ⇒ Context
Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :
def initialize(*args)
super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }
end
17 18 19 20 21 |
# File 'lib/zmq/context.rb', line 17 def initialize(*args) super trap(:INT, "DEFAULT") trap(:TERM, "DEFAULT") end |
Class Method Details
.ZMQ::Context.new ⇒ ZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context
Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.
Examples
ZMQ::Context.new => ZMQ::Context
ZMQ::Context.new(1) => ZMQ::Context
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'ext/rbczmq/context.c', line 88
static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
VALUE ctx_map;
VALUE io_threads;
zmq_ctx_wrapper *ctx = NULL;
rb_scan_args(argc, argv, "01", &io_threads);
ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
if (!NIL_P(rb_hash_aref(ctx_map, get_pid()))) rb_raise(rb_eZmqError, "single ZMQ context per process allowed");
context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, 0, rb_czmq_free_ctx_gc, ctx);
ctx->ctx = (zctx_t*)rb_thread_blocking_region(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(ctx->ctx, ctx);
ctx->flags = 0;
ctx->pid = getpid();
ctx->pidValue = get_pid();
rb_obj_call_init(context, 0, NULL);
rb_hash_aset(ctx_map, ctx->pidValue, context);
if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
return context;
}
|
Instance Method Details
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint
ctx.bind(:PUB, “tcp://127.0.0.1:5000”)
35 36 37 38 39 |
# File 'lib/zmq/context.rb', line 35 def bind(sock_type, endpoint) s = socket(sock_type) s.bind(endpoint) s end |
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint
ctx.connect(:SUB, “tcp://127.0.0.1:5000”)
45 46 47 48 49 |
# File 'lib/zmq/context.rb', line 45 def connect(sock_type, endpoint) s = socket(sock_type) s.connect(endpoint) s end |
#destroy ⇒ nil
122 123 124 125 126 127 128 |
# File 'ext/rbczmq/context.c', line 122
static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
rb_czmq_free_ctx(ctx);
return Qnil;
}
|
#iothreads=(2) ⇒ nil
142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'ext/rbczmq/context.c', line 142
static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
int iothreads;
errno = 0;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
Check_Type(threads, T_FIXNUM);
iothreads = FIX2INT(threads);
if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
zctx_set_iothreads(ctx->ctx, iothreads);
if (zmq_errno() == EINVAL) ZmqRaiseSysError();
return Qnil;
}
|
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.
Examples
ctx = ZMQ::Context.new
ctx.linger = 100 => nil
170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'ext/rbczmq/context.c', line 170
static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
errno = 0;
int msecs;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
Check_Type(linger, T_FIXNUM);
msecs = FIX2INT(linger);
if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
zctx_set_linger(ctx->ctx, msecs);
return Qnil;
}
|
#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket
Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they’re always spawned off a context. Sockets also track state of the current Ruby thread they’re created in to ensure they always only ever do work on the thread they were spawned on.
Examples
ctx = ZMQ::Context.new
ctx.socket(:PUSH) => ZMQ::Socket
ctx.socket(ZMQ::PUSH) => ZMQ::Socket
267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'ext/rbczmq/context.c', line 267
static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
int socket_type;
struct nogvl_socket_args args;
errno = 0;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);
args.ctx = ctx->ctx;
args.type = socket_type;
return rb_czmq_socket_alloc(obj, ctx->ctx, (void*)rb_thread_blocking_region(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0));
}
|