Class: ZMQ::Context
- Inherits:
-
Object
- Object
- ZMQ::Context
- Defined in:
- lib/zmq/context.rb,
ext/rbczmq/context.c
Class Method Summary collapse
-
.new(*args) ⇒ Object
Returns a handle to a new ZMQ context.
Instance Method Summary collapse
-
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint.
-
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint.
-
#destroy ⇒ nil
Destroy a ZMQ context and all sockets in it.
-
#hwm ⇒ Fixnum
Returns High Water Mark (HWM) option used for native thread creation (non-Ruby threads, ahead of time API addition).
-
#hwm=(100) ⇒ nil
Sets the High Water Mark (HWM) option used for native thread creation (non-Ruby threads, ahead of time API addition).
-
#initialize(*args) ⇒ Context
constructor
Overload the libczmq handler installed for SIGINT and SIGTERM on context init.
-
#iothreads=(2) ⇒ nil
Raises default I/O threads from 1 - there should be no need to fiddle with this.
-
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them.
-
#socket(type) ⇒ Object
Creates a socket within this ZMQ context.
Constructor Details
#initialize(*args) ⇒ Context
Overload the libczmq handler installed for SIGINT and SIGTERM on context init. This ensures we fallback to the default Ruby signal handlers which is least likely to violate the principle of least surprise. As an alternative fallback, we expose ZMQ.interrupted! which reverts back to the libczmq default actions when called from a Ruby signal handler. The following restores the default libczmq behavior :
def initialize(*args)
super
trap(:INT){ ZMQ.interrupted! }
trap(:TERM){ ZMQ.interrupted! }
end
17 18 19 20 21 |
# File 'lib/zmq/context.rb', line 17 def initialize(*args) super trap(:INT, "DEFAULT") trap(:TERM, "DEFAULT") end |
Class Method Details
.ZMQ::Context.new ⇒ ZMQ::Context .ZMQ::Context.new(1) ⇒ ZMQ::Context
Returns a handle to a new ZMQ context. A single context per process is supported in order to guarantee stability across all Ruby implementations. A context should be passed as an argument to any Ruby threads. Optionally a context can be initialized with an I/O threads value (default: 1) - there should be no need to fiddle with this.
Examples
ZMQ::Context.new => ZMQ::Context
ZMQ::Context.new(1) => ZMQ::Context
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'ext/rbczmq/context.c', line 85
static VALUE rb_czmq_ctx_s_new(int argc, VALUE *argv, VALUE context)
{
VALUE ctx_map;
VALUE io_threads;
zmq_ctx_wrapper *ctx = NULL;
rb_scan_args(argc, argv, "01", &io_threads);
ctx_map = rb_ivar_get(rb_mZmq, intern_zctx_process);
if (!NIL_P(rb_hash_aref(ctx_map, get_pid()))) rb_raise(rb_eZmqError, "single ZMQ context per process allowed");
context = Data_Make_Struct(rb_cZmqContext, zmq_ctx_wrapper, 0, rb_czmq_free_ctx_gc, ctx);
ctx->ctx = (zctx_t*)rb_thread_blocking_region(rb_czmq_nogvl_zctx_new, NULL, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(ctx->ctx, ctx);
ctx->flags = 0;
rb_obj_call_init(context, 0, NULL);
rb_hash_aset(ctx_map, get_pid(), context);
if (!NIL_P(io_threads)) rb_czmq_ctx_set_iothreads(context, io_threads);
return context;
}
|
Instance Method Details
#bind(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and bind to a given endpoint
ctx.bind(:PUB, “tcp://127.0.0.1:5000”)
35 36 37 38 39 |
# File 'lib/zmq/context.rb', line 35 def bind(sock_type, endpoint) s = socket(sock_type) s.bind(endpoint) s end |
#connect(sock_type, endpoint) ⇒ Object
Sugaring for spawning a new socket and connect to a given endpoint
ctx.connect(:SUB, “tcp://127.0.0.1:5000”)
45 46 47 48 49 |
# File 'lib/zmq/context.rb', line 45 def connect(sock_type, endpoint) s = socket(sock_type) s.connect(endpoint) s end |
#destroy ⇒ nil
117 118 119 120 121 122 |
# File 'ext/rbczmq/context.c', line 117
static VALUE rb_czmq_ctx_destroy(VALUE obj)
{
ZmqGetContext(obj);
rb_czmq_free_ctx(ctx);
return Qnil;
}
|
#hwm ⇒ Fixnum
188 189 190 191 192 193 194 195 |
# File 'ext/rbczmq/context.c', line 188
static VALUE rb_czmq_ctx_hwm(VALUE obj)
{
errno = 0;
int wm;
ZmqGetContext(obj);
rb_warn("Deprecated method ZMQ::Context#hwm, does nothing - to be removed after 2013/05/14");
return INT2FIX(zctx_hwm(ctx->ctx));
}
|
#hwm=(100) ⇒ nil
209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'ext/rbczmq/context.c', line 209
static VALUE rb_czmq_ctx_set_hwm(VALUE obj, VALUE hwm)
{
errno = 0;
int wm;
ZmqGetContext(obj);
Check_Type(hwm, T_FIXNUM);
rb_warn("Deprecated method ZMQ::Context#hwm=, does nothing - to be removed after 2013/05/14");
wm = FIX2INT(hwm);
if (wm < 0) rb_raise(rb_eZmqError, "negative HWM values is not supported.");
zctx_set_hwm(ctx->ctx, wm);
return Qnil;
}
|
#iothreads=(2) ⇒ nil
136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'ext/rbczmq/context.c', line 136
static VALUE rb_czmq_ctx_set_iothreads(VALUE obj, VALUE threads)
{
int iothreads;
errno = 0;
ZmqGetContext(obj);
Check_Type(threads, T_FIXNUM);
iothreads = FIX2INT(threads);
if (iothreads > 1) rb_warn("You probably don't want to spawn more than 1 I/O thread per ZMQ context.");
if (iothreads < 0) rb_raise(rb_eZmqError, "negative I/O threads count is not supported.");
zctx_set_iothreads(ctx->ctx, iothreads);
if (zmq_errno() == EINVAL) ZmqRaiseSysError();
return Qnil;
}
|
#linger=(100) ⇒ nil
Set msecs to flush sockets when closing them. A high value may block / pause the application on socket close. This binding defaults to a linger value of 1 msec set for all sockets, which is important for the reactor implementation in ZMQ::Loop to avoid stalling the event loop.
Examples
ctx = ZMQ::Context.new
ctx.linger = 100 => nil
164 165 166 167 168 169 170 171 172 173 174 |
# File 'ext/rbczmq/context.c', line 164
static VALUE rb_czmq_ctx_set_linger(VALUE obj, VALUE linger)
{
errno = 0;
int msecs;
ZmqGetContext(obj);
Check_Type(linger, T_FIXNUM);
msecs = FIX2INT(linger);
if (msecs < 0) rb_raise(rb_eZmqError, "negative linger / timeout values is not supported.");
zctx_set_linger(ctx->ctx, msecs);
return Qnil;
}
|
#socket(: PUSH) ⇒ ZMQ::Socket #socket(ZMQ: :PUSH) ⇒ ZMQ::Socket
Creates a socket within this ZMQ context. This is the only API exposed for creating sockets - they’re always spawned off a context. Sockets also track state of the current Ruby thread they’re created in to ensure they always only ever do work on the thread they were spawned on.
Examples
ctx = ZMQ::Context.new
ctx.socket(:PUSH) => ZMQ::Socket
ctx.socket(ZMQ::PUSH) => ZMQ::Socket
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
# File 'ext/rbczmq/context.c', line 281
static VALUE rb_czmq_ctx_socket(VALUE obj, VALUE type)
{
VALUE socket;
int socket_type;
struct nogvl_socket_args args;
zmq_sock_wrapper *sock = NULL;
errno = 0;
ZmqGetContext(obj);
if (TYPE(type) != T_FIXNUM && TYPE(type) != T_SYMBOL) rb_raise(rb_eTypeError, "wrong socket type %s (expected Fixnum or Symbol)", RSTRING_PTR(rb_obj_as_string(type)));
socket_type = FIX2INT((SYMBOL_P(type)) ? rb_const_get_at(rb_mZmq, rb_to_id(type)) : type);
socket = Data_Make_Struct(rb_czmq_ctx_socket_klass(socket_type), zmq_sock_wrapper, rb_czmq_mark_sock, rb_czmq_free_sock_gc, sock);
args.ctx = ctx->ctx;
args.type = socket_type;
sock->socket = (void*)rb_thread_blocking_region(rb_czmq_nogvl_socket_new, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssertObjOnAlloc(sock->socket, sock);
#ifndef HAVE_RB_THREAD_BLOCKING_REGION
sock->str_buffer = zlist_new();
sock->frame_buffer = zlist_new();
sock->msg_buffer = zlist_new();
#endif
sock->flags = 0;
sock->ctx = ctx->ctx;
sock->verbose = false;
sock->state = ZMQ_SOCKET_PENDING;
sock->endpoints = rb_ary_new();
sock->thread = rb_thread_current();
sock->context = obj;
sock->monitor_endpoint = Qnil;
sock->monitor_handler = Qnil;
sock->monitor_thread = Qnil;
rb_obj_call_init(socket, 0, NULL);
return socket;
}
|