Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/zmq/socket.rb,
ext/rbczmq/socket.c
Defined Under Namespace
Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Stream, Sub, XPub, XSub
Constant Summary collapse
- PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
- PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
- BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
- CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
- DISCONNECTED =
INT2NUM(ZMQ_SOCKET_DISCONNECTED)
Class Method Summary collapse
Instance Method Summary collapse
-
#affinity ⇒ Fixnum
Returns the socket AFFINITY value.
-
#affinity=(1) ⇒ nil
Sets the socket AFFINITY value.
-
#backlog ⇒ Fixnum
Returns the socket BACKLOG value.
-
#backlog=(200) ⇒ nil
Sets the socket BACKLOG value.
-
#bind(uri) ⇒ Object
Binds to a given endpoint.
-
#close ⇒ nil
Closes a socket.
-
#connect(uri) ⇒ Object
Connects to a given endpoint.
-
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
-
#delay_attach_on_connect=(true) ⇒ nil
Sets the socket DELAY_ATTACH_ON_CONNECT value.
-
#disconnect("tcp: //localhost:3456") ⇒ Boolean
Attempts to disconnect from a given endpoint.
-
#endpoint ⇒ Object
Returns the endpoint this socket is currently connected to, if any.
-
#endpoints ⇒ Array of Strings
Returns the endpoints this socket is currently connected to, if any.
-
#events ⇒ Fixnum
Query if this socket is in a readable or writable state.
-
#fd ⇒ Fixnum
(also: #to_i)
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor.
-
#identity=(value) ⇒ Object
Sets the socket IDENTITY value.
-
#ipv4only=(true) ⇒ nil
Sets the socket IPV4ONLY value.
-
#ipv4only ⇒ Boolean
Returns the socket IPV4ONLY value.
-
#last_endpoint ⇒ Object
Gets the last endpoint that this socket connected or bound to.
-
#linger ⇒ Fixnum
Returns the socket LINGER value.
-
#linger=(1000) ⇒ nil
Sets the socket LINGER value in ms.
-
#maxmsgsize ⇒ Fixnum
Returns the socket MAXMSGSIZE value.
-
#maxmsgsize=(20) ⇒ nil
Sets the socket MAXMSGSIZE value.
-
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket.
-
#multicast_hops ⇒ Fixnum
Returns the socket MULTICAST_HOPS value.
-
#multicast_hops=(20) ⇒ nil
Sets the socket MULTICAST_HOPS value.
-
#poll(100) ⇒ Boolean
Poll for input events on the socket.
-
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default.
-
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default.
-
#rate ⇒ Fixnum
Returns the socket RATE value.
-
#rate=(50000) ⇒ nil
Sets the socket RATE value.
-
#rcvbuf ⇒ Fixnum
Returns the socket RCVBUF value.
-
#rcvbuf=(1000) ⇒ nil
Sets the socket RCVBUF value.
-
#rcvhwm ⇒ Fixnum
Returns the socket receive HWM (High Water Mark) value.
-
#rcvhwm=(100) ⇒ nil
Sets the socket receive HWM (High Water Mark() value.
-
#rcvmore ⇒ Boolean
Query if there’s more messages to receive.
-
#rcvtimeo ⇒ Fixnum
Returns the socket RCVTIMEO value.
-
#rcvtimeout=(200) ⇒ nil
Sets the socket RCVTIMEO value.
-
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket.
-
#bind("inproc: //test") ⇒ Fixnum
Binds to a given endpoint.
-
#connect("tcp: //localhost:3456") ⇒ Boolean
Attempts to connect to a given endpoint.
-
#reconnect_ivl ⇒ Fixnum
Returns the socket RECONNECT_IVL value.
-
#reconnect_ivl=(200) ⇒ nil
Sets the socket RECONNECT_IVL value.
-
#reconnect_ivl_max ⇒ Fixnum
Returns the socket RECONNECT_IVL_MAX value.
-
#reconnect_ivl_max=(5) ⇒ nil
Sets the socket RECONNECT_IVL_MAX value.
-
#recovery_ivl ⇒ Fixnum
Returns the socket RECOVERY_IVL value.
-
#recovery_ivl=(20) ⇒ nil
Sets the socket RECOVERY_IVL value.
-
#recv ⇒ String?
Receive a string from this ZMQ socket.
-
#recv_frame ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_frame_nonblock ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_message ⇒ ZMQ::Message?
Receives a ZMQ message from this socket.
-
#recv_nonblock ⇒ String?
Receive a string from this ZMQ socket.
-
#router_mandatory=(true) ⇒ nil
Sets the socket ROUTER_MANDATORY value.
-
#router_raw=(true) ⇒ nil
Sets the socket ROUTER_RAW value.
-
#send("message") ⇒ Boolean
Sends a string to this ZMQ socket.
-
#send_frame(frame) ⇒ nil
Sends a ZMQ::Frame instance to this socket.
-
#send_message(msg) ⇒ nil
Sends a ZMQ::Message instance to this socket.
-
#sendm("message") ⇒ Boolean
Sends a string to this ZMQ socket, with a more flag set.
-
#sndbuf ⇒ Fixnum
Returns the socket SNDBUF value.
-
#sndbuf=(1000) ⇒ nil
Sets the socket SNDBUF value.
-
#sndhwm ⇒ Fixnum
Returns the socket send HWM (High Water Mark) value.
-
#sndhwm=(100) ⇒ nil
Sets the socket send HWM (High Water Mark() value.
-
#sndtimeo ⇒ Fixnum
Returns the socket SNDTIMEO value.
-
#sndtimeout=(200) ⇒ nil
Sets the socket SNDTIMEO value.
-
#state ⇒ String
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED.
-
#subscribe(value) ⇒ Object
Subscribes this SUB socket to a topic.
-
#to_s ⇒ Object
Generates a string representation of the current socket state.
-
#type_str ⇒ Object
Generates a string representation of this socket type.
-
#disconnect("tcp: //localhost:3456") ⇒ Boolean
Attempts to disconnect from a given endpoint.
-
#unsubscribe(value) ⇒ Object
Unsubscribes this SUB socket from a topic.
-
#verbose=(true) ⇒ nil
Let this socket be verbose - dumps a lot of data to stdout for debugging.
-
#writable? ⇒ Boolean
Determines if this socket is in a writable state.
-
#xpub_verbose=(true) ⇒ nil
Sets the socket XPUB_VERBOSE value.
Class Method Details
.handle_fsm_errors(error, *methods) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/zmq/socket.rb', line 14 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end evl end end |
.unsupported_api(*methods) ⇒ Object
6 7 8 9 10 11 12 |
# File 'lib/zmq/socket.rb', line 6 def self.unsupported_api(*methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!"); end evl end end |
Instance Method Details
#affinity ⇒ Fixnum
994 995 996 997 998 999 |
# File 'ext/rbczmq/socket.c', line 994
static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_affinity(sock->socket));
}
|
#affinity=(1) ⇒ nil
1014 1015 1016 1017 1018 |
# File 'ext/rbczmq/socket.c', line 1014
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}
|
#backlog ⇒ Fixnum
1425 1426 1427 1428 1429 1430 |
# File 'ext/rbczmq/socket.c', line 1425
static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_backlog(sock->socket));
}
|
#backlog=(200) ⇒ nil
1445 1446 1447 1448 1449 |
# File 'ext/rbczmq/socket.c', line 1445
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}
|
#bind(uri) ⇒ Object
Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”
socket.bind “collector.domain.com” # resolves 10.0.0.2:9000
99 100 101 102 |
# File 'lib/zmq/socket.rb', line 99 def bind(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_bind(uri) end |
#close ⇒ nil
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
Examples
sock.close => nil
80 81 82 83 84 85 86 87 88 89 90 |
# File 'ext/rbczmq/socket.c', line 80
static VALUE rb_czmq_socket_close(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
/* This is useless for production / real use cases as we can't query the state again OR assume
anything about the underlying connection. Merely doing the right thing. */
sock->state = ZMQ_SOCKET_DISCONNECTED;
rb_czmq_context_destroy_socket(sock);
return Qnil;
}
|
#connect(uri) ⇒ Object
Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000
111 112 113 114 |
# File 'lib/zmq/socket.rb', line 111 def connect(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_connect(uri) end |
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
socket = ctx.socket(:PUB)
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000 10.0.0.3:9000
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/zmq/socket.rb', line 122 def connect_all(uri) if uri =~ PROTO_REXP real_connect(uri) return end addresses = resolve_all(uri) if addresses.empty? real_connect(uri) else addresses.each do |address| host = Resolv.getaddress(address.target.to_s) real_connect("tcp://#{host}:#{address.port}") end end self end |
#delay_attach_on_connect=(true) ⇒ nil
1230 1231 1232 1233 1234 |
# File 'ext/rbczmq/socket.c', line 1230
static VALUE rb_czmq_socket_set_opt_delay_attach_on_connect(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_delay_attach_on_connect, "DELAY_ATTACH_ON_CONNECT", value);
}
|
#disconnect("tcp: //localhost:3456") ⇒ Boolean
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
# File 'ext/rbczmq/socket.c', line 299
static VALUE rb_czmq_socket_disconnect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_disconnect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: disconnected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
rb_ary_delete(sock->endpoints, endpoint);
long endpoint_count = RARRAY_LEN(sock->endpoints);
if (endpoint_count == 0) {
sock->state = ZMQ_SOCKET_DISCONNECTED;
}
return Qtrue;
}
|
#endpoint ⇒ Object
35 36 37 |
# File 'lib/zmq/socket.rb', line 35 def endpoint endpoints.first end |
#endpoints ⇒ Array of Strings
107 108 109 110 111 112 |
# File 'ext/rbczmq/socket.c', line 107
static VALUE rb_czmq_socket_endpoints(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->endpoints;
}
|
#events ⇒ Fixnum
1633 1634 1635 1636 1637 1638 |
# File 'ext/rbczmq/socket.c', line 1633
static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_events(sock->socket));
}
|
#fd ⇒ Fixnum Also known as: to_i
152 153 154 155 156 157 158 |
# File 'ext/rbczmq/socket.c', line 152
static VALUE rb_czmq_socket_fd(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (sock->state == ZMQ_SOCKET_PENDING || sock->state == ZMQ_SOCKET_DISCONNECTED) return INT2NUM(-1);
return INT2NUM(zsocket_fd(sock->socket));
}
|
#identity=(value) ⇒ Object
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 |
# File 'ext/rbczmq/socket.c', line 1542
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
char *val;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(value, T_STRING);
if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
val = StringValueCStr(value);
zsocket_set_identity(sock->socket, val);
if (sock->verbose)
zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
return Qnil;
}
|
#ipv4only=(true) ⇒ nil
1211 1212 1213 1214 1215 |
# File 'ext/rbczmq/socket.c', line 1211
static VALUE rb_czmq_socket_set_opt_ipv4only(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_ipv4only, "IPV4ONLY", value);
}
|
#ipv4only ⇒ Boolean
1189 1190 1191 1192 1193 1194 1195 1196 |
# File 'ext/rbczmq/socket.c', line 1189
static VALUE rb_czmq_socket_opt_ipv4only(VALUE obj)
{
int ipv4only;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ipv4only = zsocket_ipv4only(sock->socket);
return (ipv4only == 0 ? Qfalse : Qtrue);
}
|
#port=(sock) ⇒ 41415 #last_endpoint ⇒ Object
Gets the last endpoint that this socket connected or bound to.
1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 |
# File 'ext/rbczmq/socket.c', line 1728
static VALUE rb_czmq_socket_opt_last_endpoint(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
char* endpoint_string = zsocket_last_endpoint(sock->socket);
VALUE result = rb_str_new_cstr(endpoint_string);
if (endpoint_string != NULL) {
free(endpoint_string);
}
return result;
}
|
#linger ⇒ Fixnum
1384 1385 1386 1387 1388 1389 1390 1391 |
# File 'ext/rbczmq/socket.c', line 1384
static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
// return INT2NUM(zsocket_linger(sock->socket));
// TODO: how to get the linger value in ZMQ4/CZMQ2?
return INT2NUM(-1);
}
|
#linger=(1000) ⇒ nil
1406 1407 1408 1409 1410 |
# File 'ext/rbczmq/socket.c', line 1406
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}
|
#maxmsgsize ⇒ Fixnum
1111 1112 1113 1114 1115 1116 |
# File 'ext/rbczmq/socket.c', line 1111
static VALUE rb_czmq_socket_opt_maxmsgsize(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_maxmsgsize(sock->socket));
}
|
#maxmsgsize=(20) ⇒ nil
1131 1132 1133 1134 1135 |
# File 'ext/rbczmq/socket.c', line 1131
static VALUE rb_czmq_socket_set_opt_maxmsgsize(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_maxmsgsize, "MAXMSGSIZE", value);
}
|
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket
Examples
ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
rep.monitor("inproc://monitoring.rep", RepMonitor)
req = ctx.socket(:REQ)
req.monitor("inproc://monitoring.req", ReqMonitor, ZMQ_EVENT_DISCONNECTED)
rep.bind("tcp://127.0.0.1:5331")
rep.bind("tcp://127.0.0.1:5332")
1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 |
# File 'ext/rbczmq/socket.c', line 1853
static VALUE rb_czmq_socket_monitor(int argc, VALUE *argv, VALUE obj)
{
VALUE endpoint;
VALUE handler;
VALUE events;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "12", &endpoint, &handler, &events);
Check_Type(endpoint, T_STRING);
if (NIL_P(events))
events = rb_const_get_at(rb_mZmq, rb_intern("EVENT_ALL"));
if (NIL_P(handler)) {
handler = rb_class_new_instance(0, NULL, rb_const_get_at(rb_mZmq, rb_intern("Monitor")));
}
Check_Type(events, T_FIXNUM);
rc = zmq_socket_monitor(sock->socket, StringValueCStr(endpoint), NUM2INT(events));
if (rc == 0) {
sock->monitor_endpoint = endpoint;
sock->monitor_handler = handler;
sock->monitor_thread = rb_thread_create(rb_czmq_socket_monitor_thread, (void*)sock);
return Qtrue;
} else {
return Qfalse;
}
}
|
#multicast_hops ⇒ Fixnum
1150 1151 1152 1153 1154 1155 |
# File 'ext/rbczmq/socket.c', line 1150
static VALUE rb_czmq_socket_opt_multicast_hops(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_multicast_hops(sock->socket));
}
|
#multicast_hops=(20) ⇒ nil
1170 1171 1172 1173 1174 |
# File 'ext/rbczmq/socket.c', line 1170
static VALUE rb_czmq_socket_set_opt_multicast_hops(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_multicast_hops, "MULTICAST_HOPS", value);
}
|
#poll(100) ⇒ Boolean
887 888 889 890 891 892 893 894 895 896 897 898 899 |
# File 'ext/rbczmq/socket.c', line 887
static VALUE rb_czmq_socket_poll(VALUE obj, VALUE timeout)
{
bool readable;
struct nogvl_socket_poll_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
Check_Type(timeout, T_FIXNUM);
ZmqSockGuardCrossThread(sock);
args.socket = sock;
args.timeout = FIX2INT(timeout);
readable = (bool)rb_thread_call_without_gvl(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
return (readable == true) ? Qtrue : Qfalse;
}
|
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default
83 84 85 |
# File 'lib/zmq/socket.rb', line 83 def poll_readable? true end |
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default
88 89 90 |
# File 'lib/zmq/socket.rb', line 88 def poll_writable? true end |
#rate ⇒ Fixnum
1033 1034 1035 1036 1037 1038 |
# File 'ext/rbczmq/socket.c', line 1033
static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rate(sock->socket));
}
|
#rate=(50000) ⇒ nil
1053 1054 1055 1056 1057 |
# File 'ext/rbczmq/socket.c', line 1053
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}
|
#rcvbuf ⇒ Fixnum
1345 1346 1347 1348 1349 1350 |
# File 'ext/rbczmq/socket.c', line 1345
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvbuf(sock->socket));
}
|
#rcvbuf=(1000) ⇒ nil
1365 1366 1367 1368 1369 |
# File 'ext/rbczmq/socket.c', line 1365
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}
|
#rcvhwm ⇒ Fixnum
954 955 956 957 958 959 |
# File 'ext/rbczmq/socket.c', line 954
static VALUE rb_czmq_socket_opt_rcvhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvhwm(sock->socket));
}
|
#rcvhwm=(100) ⇒ nil
975 976 977 978 979 |
# File 'ext/rbczmq/socket.c', line 975
static VALUE rb_czmq_socket_set_opt_rcvhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvhwm, "RCVHWM", value);
}
|
#rcvmore ⇒ Boolean
1613 1614 1615 1616 1617 1618 |
# File 'ext/rbczmq/socket.c', line 1613
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#rcvtimeo ⇒ Fixnum
1653 1654 1655 1656 1657 1658 |
# File 'ext/rbczmq/socket.c', line 1653
static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvtimeo(sock->socket));
}
|
#rcvtimeout=(200) ⇒ nil
1673 1674 1675 1676 1677 |
# File 'ext/rbczmq/socket.c', line 1673
static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}
|
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
44 45 46 |
# File 'lib/zmq/socket.rb', line 44 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end |
#bind("inproc: //test") ⇒ Fixnum
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'ext/rbczmq/socket.c', line 204
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
/* ZmqAssert will return false on any non-zero return code. Bind returns the port number */
if (rc < 0) {
ZmqAssert(rc);
}
/* get the endpoint name with any ephemeral ports filled in. */
char* endpoint_string = zsocket_last_endpoint(sock->socket);
ZmqAssert(endpoint_string != NULL);
if (sock->verbose)
zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, endpoint_string);
sock->state = ZMQ_SOCKET_BOUND;
rb_ary_push(sock->endpoints, rb_str_new_cstr(endpoint_string));
free(endpoint_string);
return INT2NUM(rc);
}
|
#connect("tcp: //localhost:3456") ⇒ Boolean
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'ext/rbczmq/socket.c', line 245
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
/* get the endpoint name with any ephemeral ports filled in. */
char* endpoint_string = zsocket_last_endpoint(sock->socket);
ZmqAssert(endpoint_string != NULL);
if (sock->verbose)
zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, endpoint_string);
sock->state = ZMQ_SOCKET_CONNECTED;
rb_ary_push(sock->endpoints, rb_str_new_cstr(endpoint_string));
free(endpoint_string);
return Qtrue;
}
|
#reconnect_ivl ⇒ Fixnum
1464 1465 1466 1467 1468 1469 |
# File 'ext/rbczmq/socket.c', line 1464
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}
|
#reconnect_ivl=(200) ⇒ nil
1484 1485 1486 1487 1488 |
# File 'ext/rbczmq/socket.c', line 1484
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}
|
#reconnect_ivl_max ⇒ Fixnum
1503 1504 1505 1506 1507 1508 |
# File 'ext/rbczmq/socket.c', line 1503
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}
|
#reconnect_ivl_max=(5) ⇒ nil
1523 1524 1525 1526 1527 |
# File 'ext/rbczmq/socket.c', line 1523
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}
|
#recovery_ivl ⇒ Fixnum
1072 1073 1074 1075 1076 1077 |
# File 'ext/rbczmq/socket.c', line 1072
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl(sock->socket));
}
|
#recovery_ivl=(20) ⇒ nil
1092 1093 1094 1095 1096 |
# File 'ext/rbczmq/socket.c', line 1092
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}
|
#recv ⇒ String?
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 |
# File 'ext/rbczmq/socket.c', line 546
static VALUE rb_czmq_socket_recv(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
zmq_msg_init(&args.message);
int rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (rc < 0) {
zmq_msg_close(&args.message);
return Qnil;
}
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
zmq_msg_close(&args.message);
result = ZmqEncode(result);
return result;
}
|
#recv_frame ⇒ ZMQ::Frame?
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 |
# File 'ext/rbczmq/socket.c', line 766
static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
zframe_t *frame = NULL;
struct nogvl_recv_args args;
char print_prefix[255];
char *cur_time = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_call_without_gvl(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_frame_nonblock ⇒ ZMQ::Frame?
800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 |
# File 'ext/rbczmq/socket.c', line 800
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
zframe_t *frame = NULL;
char print_prefix[255];
char *cur_time = NULL;
errno = 0;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
frame = zframe_recv_nowait(sock->socket);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame_nonblock", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_message ⇒ ZMQ::Message?
846 847 848 849 850 851 852 853 854 855 856 857 858 859 |
# File 'ext/rbczmq/socket.c', line 846
static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
zmsg_t *message = NULL;
struct nogvl_recv_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_call_without_gvl(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
}
|
#recv_nonblock ⇒ String?
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 |
# File 'ext/rbczmq/socket.c', line 588
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
struct nogvl_recv_args args;
errno = 0;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
zmq_msg_init(&args.message);
int rc = zmq_recvmsg(sock->socket, &args.message, ZMQ_DONTWAIT);
if (rc < 0) {
zmq_msg_close(&args.message);
return Qnil;
}
ZmqAssertSysError();
result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
zmq_msg_close(&args.message);
if (sock->verbose) {
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(result));
}
result = ZmqEncode(result);
return result;
}
|
#router_mandatory=(true) ⇒ nil
1249 1250 1251 1252 1253 |
# File 'ext/rbczmq/socket.c', line 1249
static VALUE rb_czmq_socket_set_opt_router_mandatory(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_router_mandatory, "ROUTER_MANDATORY", value);
}
|
#router_raw=(true) ⇒ nil
1268 1269 1270 1271 1272 |
# File 'ext/rbczmq/socket.c', line 1268
static VALUE rb_czmq_socket_set_opt_router_raw(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_router_raw, "ROUTER_RAW", value);
}
|
#send("message") ⇒ Boolean
458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 |
# File 'ext/rbczmq/socket.c', line 458
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
StringValue(msg);
Check_Type(msg, T_STRING);
args.msg = RSTRING_PTR(msg);
args.length = RSTRING_LEN(msg);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
return Qtrue;
}
|
#send_frame(frame) ⇒ nil
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 |
# File 'ext/rbczmq/socket.c', line 648
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
struct nogvl_send_frame_args args;
VALUE frame_obj;
VALUE flags;
char print_prefix[255];
char *cur_time = NULL;
zframe_t *print_frame = NULL;
int rc, flgs;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "11", &frame_obj, &flags);
ZmqGetFrame(frame_obj);
ZmqAssertFrameOwnedNoMessage(frame);
if (NIL_P(flags)) {
flgs = 0;
} else {
if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
Check_Type(flags, T_FIXNUM);
flgs = FIX2INT(flags);
}
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
print_frame = (flgs & ZFRAME_REUSE) ? frame->frame : zframe_dup(frame->frame);
}
args.socket = sock;
args.frame = frame->frame;
args.flags = flgs;
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if ((flgs & ZFRAME_REUSE) == 0) {
/* frame has been destroyed, clear the owns flag */
frame->flags &= ~ZMQ_FRAME_OWNED;
}
if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
return Qtrue;
}
|
#send_message(msg) ⇒ nil
720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 |
# File 'ext/rbczmq/socket.c', line 720
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
struct nogvl_send_message_args args;
zmsg_t *print_message = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
ZmqGetMessage(message_obj);
ZmqAssertMessageOwned(message);
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_call_without_gvl(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags &= ~ZMQ_MESSAGE_OWNED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
}
|
#sendm("message") ⇒ Boolean
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 |
# File 'ext/rbczmq/socket.c', line 493
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
StringValue(msg);
Check_Type(msg, T_STRING);
args.msg = RSTRING_PTR(msg);
args.length = RSTRING_LEN(msg);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
return Qtrue;
}
|
#sndbuf ⇒ Fixnum
1306 1307 1308 1309 1310 1311 |
# File 'ext/rbczmq/socket.c', line 1306
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndbuf(sock->socket));
}
|
#sndbuf=(1000) ⇒ nil
1326 1327 1328 1329 1330 |
# File 'ext/rbczmq/socket.c', line 1326
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}
|
#sndhwm ⇒ Fixnum
914 915 916 917 918 919 |
# File 'ext/rbczmq/socket.c', line 914
static VALUE rb_czmq_socket_opt_sndhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndhwm(sock->socket));
}
|
#sndhwm=(100) ⇒ nil
935 936 937 938 939 |
# File 'ext/rbczmq/socket.c', line 935
static VALUE rb_czmq_socket_set_opt_sndhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndhwm, "SNDHWM", value);
}
|
#sndtimeo ⇒ Fixnum
1692 1693 1694 1695 1696 1697 |
# File 'ext/rbczmq/socket.c', line 1692
static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndtimeo(sock->socket));
}
|
#sndtimeout=(200) ⇒ nil
1712 1713 1714 1715 1716 |
# File 'ext/rbczmq/socket.c', line 1712
static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}
|
#state ⇒ String
129 130 131 132 133 134 |
# File 'ext/rbczmq/socket.c', line 129
static VALUE rb_czmq_socket_state(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(sock->state);
}
|
#subscribe(value) ⇒ Object
1571 1572 1573 1574 1575 1576 1577 |
# File 'ext/rbczmq/socket.c', line 1571
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#to_s ⇒ Object
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/zmq/socket.rb', line 71 def to_s case state when BOUND "#{type_str} socket bound to #{endpoints.join(', ')}" when CONNECTED "#{type_str} socket connected to #{endpoints.join(', ')}" else "#{type_str} socket" end end |
#type_str ⇒ Object
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
62 63 64 |
# File 'lib/zmq/socket.rb', line 62 def type_str self.class.const_get(:TYPE_STR) end |
#disconnect("tcp: //localhost:3456") ⇒ Boolean
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'ext/rbczmq/socket.c', line 353
static VALUE rb_czmq_socket_unbind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_unbind, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: unbound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
rb_ary_delete(sock->endpoints, endpoint);
long endpoint_count = RARRAY_LEN(sock->endpoints);
if (endpoint_count == 0) {
sock->state = ZMQ_SOCKET_DISCONNECTED;
}
return Qtrue;
}
|
#unsubscribe(value) ⇒ Object
1592 1593 1594 1595 1596 1597 1598 |
# File 'ext/rbczmq/socket.c', line 1592
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#verbose=(true) ⇒ nil
389 390 391 392 393 394 395 396 397 |
# File 'ext/rbczmq/socket.c', line 389
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
vlevel = (level == Qtrue) ? true : false;
sock->verbose = vlevel;
return Qnil;
}
|
#writable? ⇒ Boolean
Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.writable? => true
53 54 55 |
# File 'lib/zmq/socket.rb', line 53 def writable? (events & ZMQ::POLLOUT) == ZMQ::POLLOUT end |
#xpub_verbose=(true) ⇒ nil
1287 1288 1289 1290 1291 |
# File 'ext/rbczmq/socket.c', line 1287
static VALUE rb_czmq_socket_set_opt_xpub_verbose(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_xpub_verbose, "XPUB_VERBOSE", value);
}
|