Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/zmq/socket.rb,
ext/rbczmq/socket.c
Defined Under Namespace
Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub
Constant Summary collapse
- PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
- PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
- BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
- CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
Class Method Summary collapse
Instance Method Summary collapse
-
#affinity ⇒ Fixnum
Returns the socket AFFINITY value.
-
#affinity=(1) ⇒ nil
Sets the socket AFFINITY value.
-
#backlog ⇒ Fixnum
Returns the socket BACKLOG value.
-
#backlog=(200) ⇒ nil
Sets the socket BACKLOG value.
-
#bind(uri) ⇒ Object
Binds to a given endpoint.
-
#close ⇒ nil
Closes a socket.
-
#connect(uri) ⇒ Object
Connects to a given endpoint.
-
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
-
#endpoint ⇒ Object
Returns the endpoint this socket is currently connected to, if any.
-
#endpoints ⇒ Array of Strings
Returns the endpoints this socket is currently connected to, if any.
-
#events ⇒ Fixnum
Query if this socket is in a readable or writable state.
-
#fd ⇒ Fixnum
(also: #to_i)
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor.
-
#hwm ⇒ Fixnum
Returns the socket HWM (High Water Mark) value.
-
#hwm=(100) ⇒ nil
Sets the socket HWM (High Water Mark() value.
-
#identity=(value) ⇒ Object
Sets the socket IDENTITY value.
-
#linger ⇒ Fixnum
Returns the socket LINGER value.
-
#linger=(1000) ⇒ nil
Sets the socket LINGER value in ms.
-
#mcast_loop=(false) ⇒ nil
Sets the socket MCAST_LOOP value.
-
#mcast_loop? ⇒ Boolean
Returns the socket MCAST_LOOP status.
-
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket.
-
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default.
-
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default.
-
#rate ⇒ Fixnum
Returns the socket RATE value.
-
#rate=(50000) ⇒ nil
Sets the socket RATE value.
-
#raw=(true) ⇒ nil
Define this as a RAW socket - applicable to ROUTER sockets only.
-
#rcvbuf ⇒ Fixnum
Returns the socket RCVBUF value.
-
#rcvbuf=(1000) ⇒ nil
Sets the socket RCVBUF value.
-
#rcvhwm ⇒ Fixnum
Returns the socket receive HWM (High Water Mark) value.
-
#rcvhwm=(100) ⇒ nil
Sets the socket receive HWM (High Water Mark() value.
-
#rcvmore ⇒ Boolean
Query if there’s more messages to receive.
-
#rcvtimeo ⇒ Fixnum
Returns the socket RCVTIMEO value.
-
#rcvtimeout=(200) ⇒ nil
Sets the socket RCVTIMEO value.
-
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket.
-
#bind("inproc: //test") ⇒ Fixnum
Binds to a given endpoint.
-
#connect("tcp: //localhost:3456") ⇒ Boolean
Attempts to connect to a given endpoint.
-
#reconnect_ivl ⇒ Fixnum
Returns the socket RECONNECT_IVL value.
-
#reconnect_ivl=(200) ⇒ nil
Sets the socket RECONNECT_IVL value.
-
#reconnect_ivl_max ⇒ Fixnum
Returns the socket RECONNECT_IVL_MAX value.
-
#reconnect_ivl_max=(5) ⇒ nil
Sets the socket RECONNECT_IVL_MAX value.
-
#recovery_ivl ⇒ Fixnum
Returns the socket RECOVERY_IVL value.
-
#recovery_ivl=(20) ⇒ nil
Sets the socket RECOVERY_IVL value.
-
#recovery_ivl_msec ⇒ Fixnum
Returns the socket RECOVERY_IVL_MSEC value.
-
#recovery_ivl_msec=(20) ⇒ nil
Sets the socket RECOVERY_IVL_MSEC value.
-
#recv ⇒ String?
Receive a string from this ZMQ socket.
-
#recv_frame ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_frame_nonblock ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_message ⇒ ZMQ::Message?
Receives a ZMQ message from this socket.
-
#recv_nonblock ⇒ String?
Receive a string from this ZMQ socket.
-
#send("message") ⇒ Boolean
Sends a string to this ZMQ socket.
-
#send_frame(frame) ⇒ nil
Sends a ZMQ::Frame instance to this socket.
-
#send_message(msg) ⇒ nil
Sends a ZMQ::Message instance to this socket.
-
#sendm("message") ⇒ Boolean
Sends a string to this ZMQ socket, with a more flag set.
-
#sndbuf ⇒ Fixnum
Returns the socket SNDBUF value.
-
#sndbuf=(1000) ⇒ nil
Sets the socket SNDBUF value.
-
#sndhwm ⇒ Fixnum
Returns the socket send HWM (High Water Mark) value.
-
#sndhwm=(100) ⇒ nil
Sets the socket send HWM (High Water Mark() value.
-
#sndtimeo ⇒ Fixnum
Returns the socket SNDTIMEO value.
-
#sndtimeout=(200) ⇒ nil
Sets the socket SNDTIMEO value.
-
#state ⇒ String
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED.
-
#subscribe(value) ⇒ Object
Subscribes this SUB socket to a topic.
-
#swap ⇒ Fixnum
Returns the socket SWAP value.
-
#swap=(100) ⇒ nil
Sets the socket SWAP value.
-
#to_s ⇒ Object
Generates a string representation of the current socket state.
-
#type_str ⇒ Object
Generates a string representation of this socket type.
-
#unsubscribe(value) ⇒ Object
Unsubscribes this SUB socket from a topic.
-
#verbose=(true) ⇒ nil
Let this socket be verbose - dumps a lot of data to stdout for debugging.
-
#writable? ⇒ Boolean
Determines if this socket is in a writable state.
Class Method Details
.handle_fsm_errors(error, *methods) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/zmq/socket.rb', line 14 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end evl end end |
.unsupported_api(*methods) ⇒ Object
6 7 8 9 10 11 12 |
# File 'lib/zmq/socket.rb', line 6 def self.unsupported_api(*methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!"); end evl end end |
Instance Method Details
#affinity ⇒ Fixnum
983 984 985 986 987 988 |
# File 'ext/rbczmq/socket.c', line 983
static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_affinity(sock->socket));
}
|
#affinity=(1) ⇒ nil
1003 1004 1005 1006 1007 |
# File 'ext/rbczmq/socket.c', line 1003
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}
|
#backlog ⇒ Fixnum
1297 1298 1299 1300 1301 1302 |
# File 'ext/rbczmq/socket.c', line 1297
static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_backlog(sock->socket));
}
|
#backlog=(200) ⇒ nil
1317 1318 1319 1320 1321 |
# File 'ext/rbczmq/socket.c', line 1317
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}
|
#bind(uri) ⇒ Object
Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”
socket.bind “collector.domain.com” # resolves 10.0.0.2:9000
99 100 101 102 |
# File 'lib/zmq/socket.rb', line 99 def bind(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_bind(uri) end |
#close ⇒ nil
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
Examples
sock.close => nil
100 101 102 103 104 105 106 107 108 109 110 |
# File 'ext/rbczmq/socket.c', line 100
static VALUE rb_czmq_socket_close(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
/* This is useless for production / real use cases as we can't query the state again OR assume
anything about the underlying connection. Merely doing the right thing. */
sock->state = ZMQ_SOCKET_PENDING;
rb_czmq_free_sock(sock);
return Qnil;
}
|
#connect(uri) ⇒ Object
Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000
111 112 113 114 |
# File 'lib/zmq/socket.rb', line 111 def connect(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_connect(uri) end |
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
socket = ctx.socket(:PUB)
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000 10.0.0.3:9000
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/zmq/socket.rb', line 122 def connect_all(uri) if uri =~ PROTO_REXP real_connect(uri) return end addresses = resolve_all(uri) if addresses.empty? real_connect(uri) else addresses.each do |address| host = Resolv.getaddress(address.target.to_s) real_connect("tcp://#{host}:#{address.port}") end end self end |
#endpoint ⇒ Object
35 36 37 |
# File 'lib/zmq/socket.rb', line 35 def endpoint endpoints.first end |
#endpoints ⇒ Array of Strings
127 128 129 130 131 132 |
# File 'ext/rbczmq/socket.c', line 127
static VALUE rb_czmq_socket_endpoints(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->endpoints;
}
|
#events ⇒ Fixnum
1505 1506 1507 1508 1509 1510 |
# File 'ext/rbczmq/socket.c', line 1505
static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_events(sock->socket));
}
|
#fd ⇒ Fixnum Also known as: to_i
172 173 174 175 176 177 178 |
# File 'ext/rbczmq/socket.c', line 172
static VALUE rb_czmq_socket_fd(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1);
return INT2NUM(zsocket_fd(sock->socket));
}
|
#hwm ⇒ Fixnum
821 822 823 824 825 826 |
# File 'ext/rbczmq/socket.c', line 821
static VALUE rb_czmq_socket_opt_hwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_hwm(sock->socket));
}
|
#hwm=(100) ⇒ nil
842 843 844 845 846 |
# File 'ext/rbczmq/socket.c', line 842
static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_hwm, "HWM", value);
}
|
#identity=(value) ⇒ Object
1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 |
# File 'ext/rbczmq/socket.c', line 1414
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
char *val;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(value, T_STRING);
if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
val = StringValueCStr(value);
zsocket_set_identity(sock->socket, val);
if (sock->verbose)
zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
return Qnil;
}
|
#linger ⇒ Fixnum
1258 1259 1260 1261 1262 1263 |
# File 'ext/rbczmq/socket.c', line 1258
static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_linger(sock->socket));
}
|
#linger=(1000) ⇒ nil
1278 1279 1280 1281 1282 |
# File 'ext/rbczmq/socket.c', line 1278
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}
|
#mcast_loop=(false) ⇒ nil
1160 1161 1162 1163 1164 |
# File 'ext/rbczmq/socket.c', line 1160
static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_mcast_loop, "MCAST_LOOP", value);
}
|
#mcast_loop? ⇒ Boolean
1140 1141 1142 1143 1144 1145 |
# File 'ext/rbczmq/socket.c', line 1140
static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket
Examples
ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
rep.monitor("inproc://monitoring.rep", RepMonitor)
req = ctx.socket(:REQ)
req.monitor("inproc://monitoring.req", ReqMonitor, ZMQ_EVENT_DISCONNECTED)
rep.bind("tcp://127.0.0.1:5331")
rep.bind("tcp://127.0.0.1:5332")
1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 |
# File 'ext/rbczmq/socket.c', line 1710
static VALUE rb_czmq_socket_monitor(int argc, VALUE *argv, VALUE obj)
{
VALUE endpoint;
VALUE handler;
VALUE events;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "12", &endpoint, &handler, &events);
Check_Type(endpoint, T_STRING);
if (NIL_P(events))
events = rb_const_get_at(rb_mZmq, rb_intern("EVENT_ALL"));
if (NIL_P(handler)) {
handler = rb_class_new_instance(0, NULL, rb_const_get_at(rb_mZmq, rb_intern("Monitor")));
}
Check_Type(events, T_FIXNUM);
rc = zmq_socket_monitor(sock->socket, StringValueCStr(endpoint), NUM2INT(events));
if (rc == 0) {
sock->monitor_endpoint = endpoint;
sock->monitor_handler = handler;
sock->monitor_thread = rb_thread_create(rb_czmq_socket_monitor_thread, (void*)sock);
rb_thread_run(sock->monitor_thread);
return Qtrue;
} else {
return Qfalse;
}
}
|
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default
83 84 85 |
# File 'lib/zmq/socket.rb', line 83 def poll_readable? true end |
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default
88 89 90 |
# File 'lib/zmq/socket.rb', line 88 def poll_writable? true end |
#rate ⇒ Fixnum
1022 1023 1024 1025 1026 1027 |
# File 'ext/rbczmq/socket.c', line 1022
static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rate(sock->socket));
}
|
#rate=(50000) ⇒ nil
1042 1043 1044 1045 1046 |
# File 'ext/rbczmq/socket.c', line 1042
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}
|
#raw=(true) ⇒ nil
1604 1605 1606 1607 1608 |
# File 'ext/rbczmq/socket.c', line 1604
static VALUE rb_czmq_socket_set_opt_raw(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_router_raw, "ROUTER_RAW", value);
}
|
#rcvbuf ⇒ Fixnum
1219 1220 1221 1222 1223 1224 |
# File 'ext/rbczmq/socket.c', line 1219
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvbuf(sock->socket));
}
|
#rcvbuf=(1000) ⇒ nil
1239 1240 1241 1242 1243 |
# File 'ext/rbczmq/socket.c', line 1239
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}
|
#rcvhwm ⇒ Fixnum
942 943 944 945 946 947 |
# File 'ext/rbczmq/socket.c', line 942
static VALUE rb_czmq_socket_opt_rcvhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvhwm(sock->socket));
}
|
#rcvhwm=(100) ⇒ nil
963 964 965 966 967 |
# File 'ext/rbczmq/socket.c', line 963
static VALUE rb_czmq_socket_set_opt_rcvhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvhwm, "RCVHWM", value);
}
|
#rcvmore ⇒ Boolean
1485 1486 1487 1488 1489 1490 |
# File 'ext/rbczmq/socket.c', line 1485
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#rcvtimeo ⇒ Fixnum
1525 1526 1527 1528 1529 1530 |
# File 'ext/rbczmq/socket.c', line 1525
static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvtimeo(sock->socket));
}
|
#rcvtimeout=(200) ⇒ nil
1545 1546 1547 1548 1549 |
# File 'ext/rbczmq/socket.c', line 1545
static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}
|
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
44 45 46 |
# File 'lib/zmq/socket.rb', line 44 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end |
#bind("inproc: //test") ⇒ Fixnum
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
# File 'ext/rbczmq/socket.c', line 224
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_BOUND;
rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
return INT2NUM(rc);
}
|
#connect("tcp: //localhost:3456") ⇒ Boolean
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'ext/rbczmq/socket.c', line 258
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_CONNECTED;
rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
return Qtrue;
}
|
#reconnect_ivl ⇒ Fixnum
1336 1337 1338 1339 1340 1341 |
# File 'ext/rbczmq/socket.c', line 1336
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}
|
#reconnect_ivl=(200) ⇒ nil
1356 1357 1358 1359 1360 |
# File 'ext/rbczmq/socket.c', line 1356
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}
|
#reconnect_ivl_max ⇒ Fixnum
1375 1376 1377 1378 1379 1380 |
# File 'ext/rbczmq/socket.c', line 1375
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}
|
#reconnect_ivl_max=(5) ⇒ nil
1395 1396 1397 1398 1399 |
# File 'ext/rbczmq/socket.c', line 1395
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}
|
#recovery_ivl ⇒ Fixnum
1061 1062 1063 1064 1065 1066 |
# File 'ext/rbczmq/socket.c', line 1061
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl(sock->socket));
}
|
#recovery_ivl=(20) ⇒ nil
1081 1082 1083 1084 1085 |
# File 'ext/rbczmq/socket.c', line 1081
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}
|
#recovery_ivl_msec ⇒ Fixnum
1101 1102 1103 1104 1105 1106 |
# File 'ext/rbczmq/socket.c', line 1101
static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl_msec(sock->socket));
}
|
#recovery_ivl_msec=(20) ⇒ nil
1121 1122 1123 1124 1125 |
# File 'ext/rbczmq/socket.c', line 1121
static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value);
}
|
#recv ⇒ String?
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
# File 'ext/rbczmq/socket.c', line 461
static VALUE rb_czmq_socket_recv(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#recv_frame ⇒ ZMQ::Frame?
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 |
# File 'ext/rbczmq/socket.c', line 696
static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
zframe_t *frame = NULL;
struct nogvl_recv_args args;
char print_prefix[255];
char *cur_time = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_frame_nonblock ⇒ ZMQ::Frame?
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 |
# File 'ext/rbczmq/socket.c', line 730
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
zframe_t *frame = NULL;
char print_prefix[255];
char *cur_time = NULL;
errno = 0;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
frame = zframe_recv_nowait(sock->socket);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame_nonblock", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_message ⇒ ZMQ::Message?
791 792 793 794 795 796 797 798 799 800 801 802 803 804 |
# File 'ext/rbczmq/socket.c', line 791
static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
zmsg_t *message = NULL;
struct nogvl_recv_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
}
|
#recv_nonblock ⇒ String?
495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 |
# File 'ext/rbczmq/socket.c', line 495
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
char *str = NULL;
errno = 0;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
str = zstr_recv_nowait(sock->socket);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#send("message") ⇒ Boolean
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 |
# File 'ext/rbczmq/socket.c', line 366
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
return Qtrue;
}
|
#send_frame(frame) ⇒ nil
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 |
# File 'ext/rbczmq/socket.c', line 555
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
struct nogvl_send_frame_args args;
VALUE frame_obj;
VALUE flags;
char print_prefix[255];
char *cur_time = NULL;
zframe_t *print_frame = NULL;
int rc, flgs;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "11", &frame_obj, &flags);
ZmqGetFrame(frame_obj);
if (NIL_P(flags)) {
flgs = 0;
} else {
if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
Check_Type(flags, T_FIXNUM);
flgs = FIX2INT(flags);
}
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
}
args.socket = sock;
args.frame = frame;
args.flags = flgs;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
return Qtrue;
}
|
#send_message(msg) ⇒ nil
636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 |
# File 'ext/rbczmq/socket.c', line 636
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
struct nogvl_send_message_args args;
zmsg_t *print_message = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
ZmqGetMessage(message_obj);
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags |= ZMQ_MESSAGE_DESTROYED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
}
|
#sendm("message") ⇒ Boolean
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'ext/rbczmq/socket.c', line 399
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
return Qtrue;
}
|
#sndbuf ⇒ Fixnum
1180 1181 1182 1183 1184 1185 |
# File 'ext/rbczmq/socket.c', line 1180
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndbuf(sock->socket));
}
|
#sndbuf=(1000) ⇒ nil
1200 1201 1202 1203 1204 |
# File 'ext/rbczmq/socket.c', line 1200
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}
|
#sndhwm ⇒ Fixnum
902 903 904 905 906 907 |
# File 'ext/rbczmq/socket.c', line 902
static VALUE rb_czmq_socket_opt_sndhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndhwm(sock->socket));
}
|
#sndhwm=(100) ⇒ nil
923 924 925 926 927 |
# File 'ext/rbczmq/socket.c', line 923
static VALUE rb_czmq_socket_set_opt_sndhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndhwm, "SNDHWM", value);
}
|
#sndtimeo ⇒ Fixnum
1564 1565 1566 1567 1568 1569 |
# File 'ext/rbczmq/socket.c', line 1564
static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndtimeo(sock->socket));
}
|
#sndtimeout=(200) ⇒ nil
1584 1585 1586 1587 1588 |
# File 'ext/rbczmq/socket.c', line 1584
static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}
|
#state ⇒ String
149 150 151 152 153 154 |
# File 'ext/rbczmq/socket.c', line 149
static VALUE rb_czmq_socket_state(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(sock->state);
}
|
#subscribe(value) ⇒ Object
1443 1444 1445 1446 1447 1448 1449 |
# File 'ext/rbczmq/socket.c', line 1443
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#swap ⇒ Fixnum
861 862 863 864 865 866 |
# File 'ext/rbczmq/socket.c', line 861
static VALUE rb_czmq_socket_opt_swap(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_swap(sock->socket));
}
|
#swap=(100) ⇒ nil
881 882 883 884 885 |
# File 'ext/rbczmq/socket.c', line 881
static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_swap, "SWAP", value);
}
|
#to_s ⇒ Object
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/zmq/socket.rb', line 71 def to_s case state when BOUND "#{type_str} socket bound to #{endpoints.join(', ')}" when CONNECTED "#{type_str} socket connected to #{endpoints.join(', ')}" else "#{type_str} socket" end end |
#type_str ⇒ Object
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
62 63 64 |
# File 'lib/zmq/socket.rb', line 62 def type_str self.class.const_get(:TYPE_STR) end |
#unsubscribe(value) ⇒ Object
1464 1465 1466 1467 1468 1469 1470 |
# File 'ext/rbczmq/socket.c', line 1464
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#verbose=(true) ⇒ nil
290 291 292 293 294 295 296 297 298 |
# File 'ext/rbczmq/socket.c', line 290
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
vlevel = (level == Qtrue) ? true : false;
sock->verbose = vlevel;
return Qnil;
}
|