Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/zmq/socket.rb,
ext/rbczmq/socket.c
Defined Under Namespace
Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub, XPub, XSub
Constant Summary collapse
- PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
- PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
- BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
- CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
- DISCONNECTED =
INT2NUM(ZMQ_SOCKET_DISCONNECTED)
Class Method Summary collapse
Instance Method Summary collapse
-
#affinity ⇒ Fixnum
Returns the socket AFFINITY value.
-
#affinity=(1) ⇒ nil
Sets the socket AFFINITY value.
-
#backlog ⇒ Fixnum
Returns the socket BACKLOG value.
-
#backlog=(200) ⇒ nil
Sets the socket BACKLOG value.
-
#bind(uri) ⇒ Object
Binds to a given endpoint.
-
#close ⇒ nil
Closes a socket.
-
#connect(uri) ⇒ Object
Connects to a given endpoint.
-
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
-
#delay_attach_on_connect=(true) ⇒ nil
Sets the socket DELAY_ATTACH_ON_CONNECT value.
-
#disconnect("tcp: //localhost:3456") ⇒ Boolean
Attempts to disconnect from a given endpoint.
-
#endpoint ⇒ Object
Returns the endpoint this socket is currently connected to, if any.
-
#endpoints ⇒ Array of Strings
Returns the endpoints this socket is currently connected to, if any.
-
#events ⇒ Fixnum
Query if this socket is in a readable or writable state.
-
#fd ⇒ Fixnum
(also: #to_i)
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor.
-
#identity=(value) ⇒ Object
Sets the socket IDENTITY value.
-
#ipv4only=(true) ⇒ nil
Sets the socket IPV4ONLY value.
-
#ipv4only ⇒ Boolean
Returns the socket IPV4ONLY value.
-
#linger ⇒ Fixnum
Returns the socket LINGER value.
-
#linger=(1000) ⇒ nil
Sets the socket LINGER value in ms.
-
#maxmsgsize ⇒ Fixnum
Returns the socket MAXMSGSIZE value.
-
#maxmsgsize=(20) ⇒ nil
Sets the socket MAXMSGSIZE value.
-
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket.
-
#multicast_hops ⇒ Fixnum
Returns the socket MULTICAST_HOPS value.
-
#multicast_hops=(20) ⇒ nil
Sets the socket MULTICAST_HOPS value.
-
#poll(100) ⇒ Boolean
Poll for input events on the socket.
-
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default.
-
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default.
-
#rate ⇒ Fixnum
Returns the socket RATE value.
-
#rate=(50000) ⇒ nil
Sets the socket RATE value.
-
#rcvbuf ⇒ Fixnum
Returns the socket RCVBUF value.
-
#rcvbuf=(1000) ⇒ nil
Sets the socket RCVBUF value.
-
#rcvhwm ⇒ Fixnum
Returns the socket receive HWM (High Water Mark) value.
-
#rcvhwm=(100) ⇒ nil
Sets the socket receive HWM (High Water Mark() value.
-
#rcvmore ⇒ Boolean
Query if there’s more messages to receive.
-
#rcvtimeo ⇒ Fixnum
Returns the socket RCVTIMEO value.
-
#rcvtimeout=(200) ⇒ nil
Sets the socket RCVTIMEO value.
-
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket.
-
#bind("inproc: //test") ⇒ Fixnum
Binds to a given endpoint.
-
#connect("tcp: //localhost:3456") ⇒ Boolean
Attempts to connect to a given endpoint.
-
#reconnect_ivl ⇒ Fixnum
Returns the socket RECONNECT_IVL value.
-
#reconnect_ivl=(200) ⇒ nil
Sets the socket RECONNECT_IVL value.
-
#reconnect_ivl_max ⇒ Fixnum
Returns the socket RECONNECT_IVL_MAX value.
-
#reconnect_ivl_max=(5) ⇒ nil
Sets the socket RECONNECT_IVL_MAX value.
-
#recovery_ivl ⇒ Fixnum
Returns the socket RECOVERY_IVL value.
-
#recovery_ivl=(20) ⇒ nil
Sets the socket RECOVERY_IVL value.
-
#recv ⇒ String?
Receive a string from this ZMQ socket.
-
#recv_frame ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_frame_nonblock ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_message ⇒ ZMQ::Message?
Receives a ZMQ message from this socket.
-
#recv_nonblock ⇒ String?
Receive a string from this ZMQ socket.
-
#router_mandatory=(true) ⇒ nil
Sets the socket ROUTER_MANDATORY value.
-
#router_raw=(true) ⇒ nil
Sets the socket ROUTER_RAW value.
-
#send("message") ⇒ Boolean
Sends a string to this ZMQ socket.
-
#send_frame(frame) ⇒ nil
Sends a ZMQ::Frame instance to this socket.
-
#send_message(msg) ⇒ nil
Sends a ZMQ::Message instance to this socket.
-
#sendm("message") ⇒ Boolean
Sends a string to this ZMQ socket, with a more flag set.
-
#sndbuf ⇒ Fixnum
Returns the socket SNDBUF value.
-
#sndbuf=(1000) ⇒ nil
Sets the socket SNDBUF value.
-
#sndhwm ⇒ Fixnum
Returns the socket send HWM (High Water Mark) value.
-
#sndhwm=(100) ⇒ nil
Sets the socket send HWM (High Water Mark() value.
-
#sndtimeo ⇒ Fixnum
Returns the socket SNDTIMEO value.
-
#sndtimeout=(200) ⇒ nil
Sets the socket SNDTIMEO value.
-
#state ⇒ String
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED.
-
#subscribe(value) ⇒ Object
Subscribes this SUB socket to a topic.
-
#to_s ⇒ Object
Generates a string representation of the current socket state.
-
#type_str ⇒ Object
Generates a string representation of this socket type.
-
#unsubscribe(value) ⇒ Object
Unsubscribes this SUB socket from a topic.
-
#verbose=(true) ⇒ nil
Let this socket be verbose - dumps a lot of data to stdout for debugging.
-
#writable? ⇒ Boolean
Determines if this socket is in a writable state.
-
#xpub_verbose=(true) ⇒ nil
Sets the socket XPUB_VERBOSE value.
Class Method Details
.handle_fsm_errors(error, *methods) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/zmq/socket.rb', line 14 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end evl end end |
.unsupported_api(*methods) ⇒ Object
6 7 8 9 10 11 12 |
# File 'lib/zmq/socket.rb', line 6 def self.unsupported_api(*methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!"); end evl end end |
Instance Method Details
#affinity ⇒ Fixnum
943 944 945 946 947 948 |
# File 'ext/rbczmq/socket.c', line 943
static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_affinity(sock->socket));
}
|
#affinity=(1) ⇒ nil
963 964 965 966 967 |
# File 'ext/rbczmq/socket.c', line 963
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}
|
#backlog ⇒ Fixnum
1374 1375 1376 1377 1378 1379 |
# File 'ext/rbczmq/socket.c', line 1374
static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_backlog(sock->socket));
}
|
#backlog=(200) ⇒ nil
1394 1395 1396 1397 1398 |
# File 'ext/rbczmq/socket.c', line 1394
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}
|
#bind(uri) ⇒ Object
Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”
socket.bind “collector.domain.com” # resolves 10.0.0.2:9000
99 100 101 102 |
# File 'lib/zmq/socket.rb', line 99 def bind(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_bind(uri) end |
#close ⇒ nil
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
Examples
sock.close => nil
96 97 98 99 100 101 102 103 104 105 106 |
# File 'ext/rbczmq/socket.c', line 96
static VALUE rb_czmq_socket_close(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
/* This is useless for production / real use cases as we can't query the state again OR assume
anything about the underlying connection. Merely doing the right thing. */
sock->state = ZMQ_SOCKET_PENDING;
rb_czmq_free_sock(sock);
return Qnil;
}
|
#connect(uri) ⇒ Object
Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000
111 112 113 114 |
# File 'lib/zmq/socket.rb', line 111 def connect(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_connect(uri) end |
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
socket = ctx.socket(:PUB)
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000 10.0.0.3:9000
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/zmq/socket.rb', line 122 def connect_all(uri) if uri =~ PROTO_REXP real_connect(uri) return end addresses = resolve_all(uri) if addresses.empty? real_connect(uri) else addresses.each do |address| host = Resolv.getaddress(address.target.to_s) real_connect("tcp://#{host}:#{address.port}") end end self end |
#delay_attach_on_connect=(true) ⇒ nil
1179 1180 1181 1182 1183 |
# File 'ext/rbczmq/socket.c', line 1179
static VALUE rb_czmq_socket_set_opt_delay_attach_on_connect(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_delay_attach_on_connect, "DELAY_ATTACH_ON_CONNECT", value);
}
|
#disconnect("tcp: //localhost:3456") ⇒ Boolean
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
# File 'ext/rbczmq/socket.c', line 307
static VALUE rb_czmq_socket_disconnect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_disconnect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: disconnected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_DISCONNECTED;
rb_ary_delete(sock->endpoints, endpoint);
return Qtrue;
}
|
#endpoint ⇒ Object
35 36 37 |
# File 'lib/zmq/socket.rb', line 35 def endpoint endpoints.first end |
#endpoints ⇒ Array of Strings
123 124 125 126 127 128 |
# File 'ext/rbczmq/socket.c', line 123
static VALUE rb_czmq_socket_endpoints(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->endpoints;
}
|
#events ⇒ Fixnum
1582 1583 1584 1585 1586 1587 |
# File 'ext/rbczmq/socket.c', line 1582
static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_events(sock->socket));
}
|
#fd ⇒ Fixnum Also known as: to_i
168 169 170 171 172 173 174 |
# File 'ext/rbczmq/socket.c', line 168
static VALUE rb_czmq_socket_fd(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (sock->state == ZMQ_SOCKET_PENDING || sock->state == ZMQ_SOCKET_DISCONNECTED) return INT2NUM(-1);
return INT2NUM(zsocket_fd(sock->socket));
}
|
#identity=(value) ⇒ Object
1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 |
# File 'ext/rbczmq/socket.c', line 1491
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
char *val;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(value, T_STRING);
if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
val = StringValueCStr(value);
zsocket_set_identity(sock->socket, val);
if (sock->verbose)
zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
return Qnil;
}
|
#ipv4only=(true) ⇒ nil
1160 1161 1162 1163 1164 |
# File 'ext/rbczmq/socket.c', line 1160
static VALUE rb_czmq_socket_set_opt_ipv4only(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_ipv4only, "IPV4ONLY", value);
}
|
#ipv4only ⇒ Boolean
1138 1139 1140 1141 1142 1143 1144 1145 |
# File 'ext/rbczmq/socket.c', line 1138
static VALUE rb_czmq_socket_opt_ipv4only(VALUE obj)
{
int ipv4only;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ipv4only = zsocket_ipv4only(sock->socket);
return (ipv4only == 0 ? Qfalse : Qtrue);
}
|
#linger ⇒ Fixnum
1333 1334 1335 1336 1337 1338 1339 1340 |
# File 'ext/rbczmq/socket.c', line 1333
static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
// return INT2NUM(zsocket_linger(sock->socket));
// TODO: how to get the linger value in ZMQ4/CZMQ2?
return INT2NUM(-1);
}
|
#linger=(1000) ⇒ nil
1355 1356 1357 1358 1359 |
# File 'ext/rbczmq/socket.c', line 1355
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}
|
#maxmsgsize ⇒ Fixnum
1060 1061 1062 1063 1064 1065 |
# File 'ext/rbczmq/socket.c', line 1060
static VALUE rb_czmq_socket_opt_maxmsgsize(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_maxmsgsize(sock->socket));
}
|
#maxmsgsize=(20) ⇒ nil
1080 1081 1082 1083 1084 |
# File 'ext/rbczmq/socket.c', line 1080
static VALUE rb_czmq_socket_set_opt_maxmsgsize(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_maxmsgsize, "MAXMSGSIZE", value);
}
|
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket
Examples
ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
rep.monitor("inproc://monitoring.rep", RepMonitor)
req = ctx.socket(:REQ)
req.monitor("inproc://monitoring.req", ReqMonitor, ZMQ_EVENT_DISCONNECTED)
rep.bind("tcp://127.0.0.1:5331")
rep.bind("tcp://127.0.0.1:5332")
1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 |
# File 'ext/rbczmq/socket.c', line 1780
static VALUE rb_czmq_socket_monitor(int argc, VALUE *argv, VALUE obj)
{
VALUE endpoint;
VALUE handler;
VALUE events;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "12", &endpoint, &handler, &events);
Check_Type(endpoint, T_STRING);
if (NIL_P(events))
events = rb_const_get_at(rb_mZmq, rb_intern("EVENT_ALL"));
if (NIL_P(handler)) {
handler = rb_class_new_instance(0, NULL, rb_const_get_at(rb_mZmq, rb_intern("Monitor")));
}
Check_Type(events, T_FIXNUM);
rc = zmq_socket_monitor(sock->socket, StringValueCStr(endpoint), NUM2INT(events));
if (rc == 0) {
sock->monitor_endpoint = endpoint;
sock->monitor_handler = handler;
sock->monitor_thread = rb_thread_create(rb_czmq_socket_monitor_thread, (void*)sock);
return Qtrue;
} else {
return Qfalse;
}
}
|
#multicast_hops ⇒ Fixnum
1099 1100 1101 1102 1103 1104 |
# File 'ext/rbczmq/socket.c', line 1099
static VALUE rb_czmq_socket_opt_multicast_hops(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_multicast_hops(sock->socket));
}
|
#multicast_hops=(20) ⇒ nil
1119 1120 1121 1122 1123 |
# File 'ext/rbczmq/socket.c', line 1119
static VALUE rb_czmq_socket_set_opt_multicast_hops(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_multicast_hops, "MULTICAST_HOPS", value);
}
|
#poll(100) ⇒ Boolean
836 837 838 839 840 841 842 843 844 845 846 847 848 |
# File 'ext/rbczmq/socket.c', line 836
static VALUE rb_czmq_socket_poll(VALUE obj, VALUE timeout)
{
bool readable;
struct nogvl_socket_poll_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
Check_Type(timeout, T_FIXNUM);
ZmqSockGuardCrossThread(sock);
args.socket = sock;
args.timeout = FIX2INT(timeout);
readable = (bool)rb_thread_blocking_region(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
return (readable == true) ? Qtrue : Qfalse;
}
|
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default
83 84 85 |
# File 'lib/zmq/socket.rb', line 83 def poll_readable? true end |
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default
88 89 90 |
# File 'lib/zmq/socket.rb', line 88 def poll_writable? true end |
#rate ⇒ Fixnum
982 983 984 985 986 987 |
# File 'ext/rbczmq/socket.c', line 982
static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rate(sock->socket));
}
|
#rate=(50000) ⇒ nil
1002 1003 1004 1005 1006 |
# File 'ext/rbczmq/socket.c', line 1002
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}
|
#rcvbuf ⇒ Fixnum
1294 1295 1296 1297 1298 1299 |
# File 'ext/rbczmq/socket.c', line 1294
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvbuf(sock->socket));
}
|
#rcvbuf=(1000) ⇒ nil
1314 1315 1316 1317 1318 |
# File 'ext/rbczmq/socket.c', line 1314
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}
|
#rcvhwm ⇒ Fixnum
903 904 905 906 907 908 |
# File 'ext/rbczmq/socket.c', line 903
static VALUE rb_czmq_socket_opt_rcvhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvhwm(sock->socket));
}
|
#rcvhwm=(100) ⇒ nil
924 925 926 927 928 |
# File 'ext/rbczmq/socket.c', line 924
static VALUE rb_czmq_socket_set_opt_rcvhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvhwm, "RCVHWM", value);
}
|
#rcvmore ⇒ Boolean
1562 1563 1564 1565 1566 1567 |
# File 'ext/rbczmq/socket.c', line 1562
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#rcvtimeo ⇒ Fixnum
1602 1603 1604 1605 1606 1607 |
# File 'ext/rbczmq/socket.c', line 1602
static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvtimeo(sock->socket));
}
|
#rcvtimeout=(200) ⇒ nil
1622 1623 1624 1625 1626 |
# File 'ext/rbczmq/socket.c', line 1622
static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}
|
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
44 45 46 |
# File 'lib/zmq/socket.rb', line 44 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end |
#bind("inproc: //test") ⇒ Fixnum
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'ext/rbczmq/socket.c', line 220
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
/* ZmqAssert will return false on any non-zero return code. Bind returns the port number */
if (rc < 0) {
ZmqAssert(rc);
}
if (sock->verbose)
zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_BOUND;
rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
return INT2NUM(rc);
}
|
#connect("tcp: //localhost:3456") ⇒ Boolean
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
# File 'ext/rbczmq/socket.c', line 257
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_CONNECTED;
rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
return Qtrue;
}
|
#reconnect_ivl ⇒ Fixnum
1413 1414 1415 1416 1417 1418 |
# File 'ext/rbczmq/socket.c', line 1413
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}
|
#reconnect_ivl=(200) ⇒ nil
1433 1434 1435 1436 1437 |
# File 'ext/rbczmq/socket.c', line 1433
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}
|
#reconnect_ivl_max ⇒ Fixnum
1452 1453 1454 1455 1456 1457 |
# File 'ext/rbczmq/socket.c', line 1452
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}
|
#reconnect_ivl_max=(5) ⇒ nil
1472 1473 1474 1475 1476 |
# File 'ext/rbczmq/socket.c', line 1472
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}
|
#recovery_ivl ⇒ Fixnum
1021 1022 1023 1024 1025 1026 |
# File 'ext/rbczmq/socket.c', line 1021
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl(sock->socket));
}
|
#recovery_ivl=(20) ⇒ nil
1041 1042 1043 1044 1045 |
# File 'ext/rbczmq/socket.c', line 1041
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}
|
#recv ⇒ String?
496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 |
# File 'ext/rbczmq/socket.c', line 496
static VALUE rb_czmq_socket_recv(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
zmq_msg_init(&args.message);
int rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (rc < 0) {
zmq_msg_close(&args.message);
return Qnil;
}
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
zmq_msg_close(&args.message);
result = ZmqEncode(result);
return result;
}
|
#recv_frame ⇒ ZMQ::Frame?
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 |
# File 'ext/rbczmq/socket.c', line 715
static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
zframe_t *frame = NULL;
struct nogvl_recv_args args;
char print_prefix[255];
char *cur_time = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_frame_nonblock ⇒ ZMQ::Frame?
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 |
# File 'ext/rbczmq/socket.c', line 749
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
zframe_t *frame = NULL;
char print_prefix[255];
char *cur_time = NULL;
errno = 0;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
frame = zframe_recv_nowait(sock->socket);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame_nonblock", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_message ⇒ ZMQ::Message?
795 796 797 798 799 800 801 802 803 804 805 806 807 808 |
# File 'ext/rbczmq/socket.c', line 795
static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
zmsg_t *message = NULL;
struct nogvl_recv_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
}
|
#recv_nonblock ⇒ String?
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 |
# File 'ext/rbczmq/socket.c', line 538
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
errno = 0;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
zmq_msg_init(&args.message);
int rc = zmq_recvmsg(sock->socket, &args.message, ZMQ_DONTWAIT);
if (rc < 0) {
zmq_msg_close(&args.message);
return Qnil;
}
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
zmq_msg_close(&args.message);
result = ZmqEncode(result);
return result;
}
|
#router_mandatory=(true) ⇒ nil
1198 1199 1200 1201 1202 |
# File 'ext/rbczmq/socket.c', line 1198
static VALUE rb_czmq_socket_set_opt_router_mandatory(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_router_mandatory, "ROUTER_MANDATORY", value);
}
|
#router_raw=(true) ⇒ nil
1217 1218 1219 1220 1221 |
# File 'ext/rbczmq/socket.c', line 1217
static VALUE rb_czmq_socket_set_opt_router_raw(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_router_raw, "ROUTER_RAW", value);
}
|
#send("message") ⇒ Boolean
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'ext/rbczmq/socket.c', line 408
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
StringValue(msg);
Check_Type(msg, T_STRING);
args.msg = RSTRING_PTR(msg);
args.length = RSTRING_LEN(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
return Qtrue;
}
|
#send_frame(frame) ⇒ nil
597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 |
# File 'ext/rbczmq/socket.c', line 597
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
struct nogvl_send_frame_args args;
VALUE frame_obj;
VALUE flags;
char print_prefix[255];
char *cur_time = NULL;
zframe_t *print_frame = NULL;
int rc, flgs;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "11", &frame_obj, &flags);
ZmqGetFrame(frame_obj);
ZmqAssertFrameOwnedNoMessage(frame);
if (NIL_P(flags)) {
flgs = 0;
} else {
if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
Check_Type(flags, T_FIXNUM);
flgs = FIX2INT(flags);
}
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
print_frame = (flgs & ZFRAME_REUSE) ? frame->frame : zframe_dup(frame->frame);
}
args.socket = sock;
args.frame = frame->frame;
args.flags = flgs;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if ((flgs & ZFRAME_REUSE) == 0) {
/* frame has been destroyed, clear the owns flag */
frame->flags &= ~ZMQ_FRAME_OWNED;
}
if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
return Qtrue;
}
|
#send_message(msg) ⇒ nil
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 |
# File 'ext/rbczmq/socket.c', line 669
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
struct nogvl_send_message_args args;
zmsg_t *print_message = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
ZmqGetMessage(message_obj);
ZmqAssertMessageOwned(message);
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags &= ~ZMQ_MESSAGE_OWNED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
}
|
#sendm("message") ⇒ Boolean
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 |
# File 'ext/rbczmq/socket.c', line 443
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
StringValue(msg);
Check_Type(msg, T_STRING);
args.msg = RSTRING_PTR(msg);
args.length = RSTRING_LEN(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
return Qtrue;
}
|
#sndbuf ⇒ Fixnum
1255 1256 1257 1258 1259 1260 |
# File 'ext/rbczmq/socket.c', line 1255
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndbuf(sock->socket));
}
|
#sndbuf=(1000) ⇒ nil
1275 1276 1277 1278 1279 |
# File 'ext/rbczmq/socket.c', line 1275
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}
|
#sndhwm ⇒ Fixnum
863 864 865 866 867 868 |
# File 'ext/rbczmq/socket.c', line 863
static VALUE rb_czmq_socket_opt_sndhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndhwm(sock->socket));
}
|
#sndhwm=(100) ⇒ nil
884 885 886 887 888 |
# File 'ext/rbczmq/socket.c', line 884
static VALUE rb_czmq_socket_set_opt_sndhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndhwm, "SNDHWM", value);
}
|
#sndtimeo ⇒ Fixnum
1641 1642 1643 1644 1645 1646 |
# File 'ext/rbczmq/socket.c', line 1641
static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndtimeo(sock->socket));
}
|
#sndtimeout=(200) ⇒ nil
1661 1662 1663 1664 1665 |
# File 'ext/rbczmq/socket.c', line 1661
static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}
|
#state ⇒ String
145 146 147 148 149 150 |
# File 'ext/rbczmq/socket.c', line 145
static VALUE rb_czmq_socket_state(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(sock->state);
}
|
#subscribe(value) ⇒ Object
1520 1521 1522 1523 1524 1525 1526 |
# File 'ext/rbczmq/socket.c', line 1520
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#to_s ⇒ Object
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/zmq/socket.rb', line 71 def to_s case state when BOUND "#{type_str} socket bound to #{endpoints.join(', ')}" when CONNECTED "#{type_str} socket connected to #{endpoints.join(', ')}" else "#{type_str} socket" end end |
#type_str ⇒ Object
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
62 63 64 |
# File 'lib/zmq/socket.rb', line 62 def type_str self.class.const_get(:TYPE_STR) end |
#unsubscribe(value) ⇒ Object
1541 1542 1543 1544 1545 1546 1547 |
# File 'ext/rbczmq/socket.c', line 1541
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#verbose=(true) ⇒ nil
339 340 341 342 343 344 345 346 347 |
# File 'ext/rbczmq/socket.c', line 339
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
vlevel = (level == Qtrue) ? true : false;
sock->verbose = vlevel;
return Qnil;
}
|
#writable? ⇒ Boolean
Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.writable? => true
53 54 55 |
# File 'lib/zmq/socket.rb', line 53 def writable? (events & ZMQ::POLLOUT) == ZMQ::POLLOUT end |
#xpub_verbose=(true) ⇒ nil
1236 1237 1238 1239 1240 |
# File 'ext/rbczmq/socket.c', line 1236
static VALUE rb_czmq_socket_set_opt_xpub_verbose(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_xpub_verbose, "XPUB_VERBOSE", value);
}
|