Class: ZMQ::Context
- Inherits:
-
Object
- Object
- ZMQ::Context
- Defined in:
- lib/zmq/context.rb,
ext/rbczmq/context.c
Class Method Summary collapse
-
.new(*args) ⇒ Object
Returns a handle to a new ZMQ context.
Instance Method Summary collapse
-
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint.
-
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint.
-
#destroy ⇒ nil
Destroy a ZMQ context and all sockets in it.
-
#initialize(*args) ⇒ Context
constructor
Overload the libczmq handler installed for SIGINT and SIGTERM on context init.
-
#iothreads=(2) ⇒ nil
Raises default I/O threads from 1 - there should be no need to fiddle with this.
-
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them.
-
#socket(type) ⇒ Object
Creates a socket within this ZMQ context.
Constructor Details
#initialize(*args) ⇒ Context
Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :
def initialize(*args)
super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }
end
17 18 19 20 21 |
# File 'lib/zmq/context.rb', line 17 def initialize(*args) super trap(:INT, "DEFAULT") trap(:TERM, "DEFAULT") end |
Class Method Details
.ZMQ::Context.new ⇒ ZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context
Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.
Examples
ZMQ::Context.new => ZMQ::Context
ZMQ::Context.new(1) => ZMQ::Context
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'ext/rbczmq/context.c', line 179
static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
VALUE ctx_map;
VALUE io_threads;
zmq_ctx_wrapper *ctx = NULL;
rb_scan_args(argc, argv, "01", &io_threads);
ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
if (!NIL_P(rb_hash_aref(ctx_map, get_pid()))) rb_raise(rb_eZmqError, "single ZMQ context per process allowed");
context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, rb_czmq_mark_ctx_gc, rb_czmq_free_ctx_gc, ctx);
ctx->ctx = (zctx_t*)rb_thread_blocking_region(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(ctx->ctx, ctx);
ctx->flags = 0;
ctx->pid = getpid();
ctx->pidValue = get_pid();
ctx->sockets = zlist_new();
rb_obj_call_init(context, 0, NULL);
rb_hash_aset(ctx_map, ctx->pidValue, context);
if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
return context;
}
|
Instance Method Details
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint
ctx.bind(:PUB, “tcp://127.0.0.1:5000”)
35 36 37 38 39 |
# File 'lib/zmq/context.rb', line 35 def bind(sock_type, endpoint) s = socket(sock_type) s.bind(endpoint) s end |
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint
ctx.connect(:SUB, “tcp://127.0.0.1:5000”)
45 46 47 48 49 |
# File 'lib/zmq/context.rb', line 45 def connect(sock_type, endpoint) s = socket(sock_type) s.connect(endpoint) s end |
#destroy ⇒ nil
214 215 216 217 218 219 220 |
# File 'ext/rbczmq/context.c', line 214
static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
rb_czmq_free_ctx(ctx);
return Qnil;
}
|
#iothreads=(2) ⇒ nil
234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'ext/rbczmq/context.c', line 234
static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
int iothreads;
errno = 0;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
Check_Type(threads, T_FIXNUM);
iothreads = FIX2INT(threads);
if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
zctx_set_iothreads(ctx->ctx, iothreads);
if (zmq_errno() == EINVAL) ZmqRaiseSysError();
return Qnil;
}
|
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.
Examples
ctx = ZMQ::Context.new
ctx.linger = 100 => nil
262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'ext/rbczmq/context.c', line 262
static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
errno = 0;
int msecs;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
Check_Type(linger, T_FIXNUM);
msecs = FIX2INT(linger);
if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
zctx_set_linger(ctx->ctx, msecs);
return Qnil;
}
|
#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket
Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they’re always spawned off a context. Sockets also track state of the current Ruby thread they’re created in to ensure they always only ever do work on the thread they were spawned on.
Examples
ctx = ZMQ::Context.new
ctx.socket(:PUSH) => ZMQ::Socket
ctx.socket(ZMQ::PUSH) => ZMQ::Socket
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'ext/rbczmq/context.c', line 370
static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
int socket_type;
struct nogvl_socket_args args;
errno = 0;
ZmqGetContext(obj);
ZmqAssertContextPidMatches(ctx);
if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);
args.ctx = ctx->ctx;
args.type = socket_type;
VALUE socket_object = rb_czmq_socket_alloc(obj, ctx->ctx, (void*)rb_thread_blocking_region(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0));
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(socket_object);
zlist_push(ctx->sockets, sock);
return socket_object;
}
|