Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/zmq/socket.rb,
ext/rbczmq/socket.c
Defined Under Namespace
Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub, XPub, XSub
Constant Summary collapse
- PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
- PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
- BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
- CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
- DISCONNECTED =
INT2NUM(ZMQ_SOCKET_DISCONNECTED)
Class Method Summary collapse
Instance Method Summary collapse
-
#affinity ⇒ Fixnum
Returns the socket AFFINITY value.
-
#affinity=(1) ⇒ nil
Sets the socket AFFINITY value.
-
#backlog ⇒ Fixnum
Returns the socket BACKLOG value.
-
#backlog=(200) ⇒ nil
Sets the socket BACKLOG value.
-
#bind(uri) ⇒ Object
Binds to a given endpoint.
-
#close ⇒ nil
Closes a socket.
-
#connect(uri) ⇒ Object
Connects to a given endpoint.
-
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
-
#delay_attach_on_connect=(true) ⇒ nil
Sets the socket DELAY_ATTACH_ON_CONNECT value.
-
#disconnect("tcp: //localhost:3456") ⇒ Boolean
Attempts to disconnect from a given endpoint.
-
#endpoint ⇒ Object
Returns the endpoint this socket is currently connected to, if any.
-
#endpoints ⇒ Array of Strings
Returns the endpoints this socket is currently connected to, if any.
-
#events ⇒ Fixnum
Query if this socket is in a readable or writable state.
-
#fd ⇒ Fixnum
(also: #to_i)
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor.
-
#identity=(value) ⇒ Object
Sets the socket IDENTITY value.
-
#ipv4only=(true) ⇒ nil
Sets the socket IPV4ONLY value.
-
#ipv4only ⇒ Boolean
Returns the socket IPV4ONLY value.
-
#linger ⇒ Fixnum
Returns the socket LINGER value.
-
#linger=(1000) ⇒ nil
Sets the socket LINGER value in ms.
-
#maxmsgsize ⇒ Fixnum
Returns the socket MAXMSGSIZE value.
-
#maxmsgsize=(20) ⇒ nil
Sets the socket MAXMSGSIZE value.
-
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket.
-
#multicast_hops ⇒ Fixnum
Returns the socket MULTICAST_HOPS value.
-
#multicast_hops=(20) ⇒ nil
Sets the socket MULTICAST_HOPS value.
-
#poll(100) ⇒ Boolean
Poll for input events on the socket.
-
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default.
-
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default.
-
#rate ⇒ Fixnum
Returns the socket RATE value.
-
#rate=(50000) ⇒ nil
Sets the socket RATE value.
-
#rcvbuf ⇒ Fixnum
Returns the socket RCVBUF value.
-
#rcvbuf=(1000) ⇒ nil
Sets the socket RCVBUF value.
-
#rcvhwm ⇒ Fixnum
Returns the socket receive HWM (High Water Mark) value.
-
#rcvhwm=(100) ⇒ nil
Sets the socket receive HWM (High Water Mark() value.
-
#rcvmore ⇒ Boolean
Query if there’s more messages to receive.
-
#rcvtimeo ⇒ Fixnum
Returns the socket RCVTIMEO value.
-
#rcvtimeout=(200) ⇒ nil
Sets the socket RCVTIMEO value.
-
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket.
-
#bind("inproc: //test") ⇒ Fixnum
Binds to a given endpoint.
-
#connect("tcp: //localhost:3456") ⇒ Boolean
Attempts to connect to a given endpoint.
-
#reconnect_ivl ⇒ Fixnum
Returns the socket RECONNECT_IVL value.
-
#reconnect_ivl=(200) ⇒ nil
Sets the socket RECONNECT_IVL value.
-
#reconnect_ivl_max ⇒ Fixnum
Returns the socket RECONNECT_IVL_MAX value.
-
#reconnect_ivl_max=(5) ⇒ nil
Sets the socket RECONNECT_IVL_MAX value.
-
#recovery_ivl ⇒ Fixnum
Returns the socket RECOVERY_IVL value.
-
#recovery_ivl=(20) ⇒ nil
Sets the socket RECOVERY_IVL value.
-
#recv ⇒ String?
Receive a string from this ZMQ socket.
-
#recv_frame ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_frame_nonblock ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_message ⇒ ZMQ::Message?
Receives a ZMQ message from this socket.
-
#recv_nonblock ⇒ String?
Receive a string from this ZMQ socket.
-
#router_mandatory=(true) ⇒ nil
Sets the socket ROUTER_MANDATORY value.
-
#router_raw=(true) ⇒ nil
Sets the socket ROUTER_RAW value.
-
#send("message") ⇒ Boolean
Sends a string to this ZMQ socket.
-
#send_frame(frame) ⇒ nil
Sends a ZMQ::Frame instance to this socket.
-
#send_message(msg) ⇒ nil
Sends a ZMQ::Message instance to this socket.
-
#sendm("message") ⇒ Boolean
Sends a string to this ZMQ socket, with a more flag set.
-
#sndbuf ⇒ Fixnum
Returns the socket SNDBUF value.
-
#sndbuf=(1000) ⇒ nil
Sets the socket SNDBUF value.
-
#sndhwm ⇒ Fixnum
Returns the socket send HWM (High Water Mark) value.
-
#sndhwm=(100) ⇒ nil
Sets the socket send HWM (High Water Mark() value.
-
#sndtimeo ⇒ Fixnum
Returns the socket SNDTIMEO value.
-
#sndtimeout=(200) ⇒ nil
Sets the socket SNDTIMEO value.
-
#state ⇒ String
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED.
-
#subscribe(value) ⇒ Object
Subscribes this SUB socket to a topic.
-
#to_s ⇒ Object
Generates a string representation of the current socket state.
-
#type_str ⇒ Object
Generates a string representation of this socket type.
-
#unsubscribe(value) ⇒ Object
Unsubscribes this SUB socket from a topic.
-
#verbose=(true) ⇒ nil
Let this socket be verbose - dumps a lot of data to stdout for debugging.
-
#writable? ⇒ Boolean
Determines if this socket is in a writable state.
-
#xpub_verbose=(true) ⇒ nil
Sets the socket XPUB_VERBOSE value.
Class Method Details
.handle_fsm_errors(error, *methods) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/zmq/socket.rb', line 14 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end evl end end |
.unsupported_api(*methods) ⇒ Object
6 7 8 9 10 11 12 |
# File 'lib/zmq/socket.rb', line 6 def self.unsupported_api(*methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!"); end evl end end |
Instance Method Details
#affinity ⇒ Fixnum
886 887 888 889 890 891 |
# File 'ext/rbczmq/socket.c', line 886
static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_affinity(sock->socket));
}
|
#affinity=(1) ⇒ nil
906 907 908 909 910 |
# File 'ext/rbczmq/socket.c', line 906
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}
|
#backlog ⇒ Fixnum
1315 1316 1317 1318 1319 1320 |
# File 'ext/rbczmq/socket.c', line 1315
static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_backlog(sock->socket));
}
|
#backlog=(200) ⇒ nil
1335 1336 1337 1338 1339 |
# File 'ext/rbczmq/socket.c', line 1335
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}
|
#bind(uri) ⇒ Object
Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”
socket.bind “collector.domain.com” # resolves 10.0.0.2:9000
99 100 101 102 |
# File 'lib/zmq/socket.rb', line 99 def bind(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_bind(uri) end |
#close ⇒ nil
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
Examples
sock.close => nil
95 96 97 98 99 100 101 102 103 104 105 |
# File 'ext/rbczmq/socket.c', line 95
static VALUE rb_czmq_socket_close(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
/* This is useless for production / real use cases as we can't query the state again OR assume
anything about the underlying connection. Merely doing the right thing. */
sock->state = ZMQ_SOCKET_PENDING;
rb_czmq_free_sock(sock);
return Qnil;
}
|
#connect(uri) ⇒ Object
Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000
111 112 113 114 |
# File 'lib/zmq/socket.rb', line 111 def connect(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_connect(uri) end |
#connect_all(uri) ⇒ Object
Connects to all endpoints that are returned from a SRV record lookup.
socket = ctx.socket(:PUB)
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000 10.0.0.3:9000
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/zmq/socket.rb', line 122 def connect_all(uri) if uri =~ PROTO_REXP real_connect(uri) return end addresses = resolve_all(uri) if addresses.empty? real_connect(uri) else addresses.each do |address| host = Resolv.getaddress(address.target.to_s) real_connect("tcp://#{host}:#{address.port}") end end self end |
#delay_attach_on_connect=(true) ⇒ nil
1122 1123 1124 1125 1126 |
# File 'ext/rbczmq/socket.c', line 1122
static VALUE rb_czmq_socket_set_opt_delay_attach_on_connect(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_delay_attach_on_connect, "DELAY_ATTACH_ON_CONNECT", value);
}
|
#disconnect("tcp: //localhost:3456") ⇒ Boolean
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
# File 'ext/rbczmq/socket.c', line 303
static VALUE rb_czmq_socket_disconnect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_disconnect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: disconnected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_DISCONNECTED;
rb_ary_delete(sock->endpoints, endpoint);
return Qtrue;
}
|
#endpoint ⇒ Object
35 36 37 |
# File 'lib/zmq/socket.rb', line 35 def endpoint endpoints.first end |
#endpoints ⇒ Array of Strings
122 123 124 125 126 127 |
# File 'ext/rbczmq/socket.c', line 122
static VALUE rb_czmq_socket_endpoints(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->endpoints;
}
|
#events ⇒ Fixnum
1523 1524 1525 1526 1527 1528 |
# File 'ext/rbczmq/socket.c', line 1523
static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_events(sock->socket));
}
|
#fd ⇒ Fixnum Also known as: to_i
167 168 169 170 171 172 173 |
# File 'ext/rbczmq/socket.c', line 167
static VALUE rb_czmq_socket_fd(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (sock->state == ZMQ_SOCKET_PENDING || sock->state == ZMQ_SOCKET_DISCONNECTED) return INT2NUM(-1);
return INT2NUM(zsocket_fd(sock->socket));
}
|
#identity=(value) ⇒ Object
1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 |
# File 'ext/rbczmq/socket.c', line 1432
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
char *val;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(value, T_STRING);
if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
val = StringValueCStr(value);
zsocket_set_identity(sock->socket, val);
if (sock->verbose)
zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
return Qnil;
}
|
#ipv4only=(true) ⇒ nil
1103 1104 1105 1106 1107 |
# File 'ext/rbczmq/socket.c', line 1103
static VALUE rb_czmq_socket_set_opt_ipv4only(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_ipv4only, "IPV4ONLY", value);
}
|
#ipv4only ⇒ Boolean
1081 1082 1083 1084 1085 1086 1087 1088 |
# File 'ext/rbczmq/socket.c', line 1081
static VALUE rb_czmq_socket_opt_ipv4only(VALUE obj)
{
int ipv4only;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ipv4only = zsocket_ipv4only(sock->socket);
return (ipv4only == 0 ? Qfalse : Qtrue);
}
|
#linger ⇒ Fixnum
1276 1277 1278 1279 1280 1281 |
# File 'ext/rbczmq/socket.c', line 1276
static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_linger(sock->socket));
}
|
#linger=(1000) ⇒ nil
1296 1297 1298 1299 1300 |
# File 'ext/rbczmq/socket.c', line 1296
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}
|
#maxmsgsize ⇒ Fixnum
1003 1004 1005 1006 1007 1008 |
# File 'ext/rbczmq/socket.c', line 1003
static VALUE rb_czmq_socket_opt_maxmsgsize(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_maxmsgsize(sock->socket));
}
|
#maxmsgsize=(20) ⇒ nil
1023 1024 1025 1026 1027 |
# File 'ext/rbczmq/socket.c', line 1023
static VALUE rb_czmq_socket_set_opt_maxmsgsize(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_maxmsgsize, "MAXMSGSIZE", value);
}
|
#monitor("inproc: //monitoring", callback, events) ⇒ nil
Registers a monitoring callback for this socket
Examples
ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
rep.monitor("inproc://monitoring.rep", RepMonitor)
req = ctx.socket(:REQ)
req.monitor("inproc://monitoring.req", ReqMonitor, ZMQ_EVENT_DISCONNECTED)
rep.bind("tcp://127.0.0.1:5331")
rep.bind("tcp://127.0.0.1:5332")
1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 |
# File 'ext/rbczmq/socket.c', line 1694
static VALUE rb_czmq_socket_monitor(int argc, VALUE *argv, VALUE obj)
{
VALUE endpoint;
VALUE handler;
VALUE events;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "12", &endpoint, &handler, &events);
Check_Type(endpoint, T_STRING);
if (NIL_P(events))
events = rb_const_get_at(rb_mZmq, rb_intern("EVENT_ALL"));
if (NIL_P(handler)) {
handler = rb_class_new_instance(0, NULL, rb_const_get_at(rb_mZmq, rb_intern("Monitor")));
}
Check_Type(events, T_FIXNUM);
rc = zmq_socket_monitor(sock->socket, StringValueCStr(endpoint), NUM2INT(events));
if (rc == 0) {
sock->monitor_endpoint = endpoint;
sock->monitor_handler = handler;
sock->monitor_thread = rb_thread_create(rb_czmq_socket_monitor_thread, (void*)sock);
return Qtrue;
} else {
return Qfalse;
}
}
|
#multicast_hops ⇒ Fixnum
1042 1043 1044 1045 1046 1047 |
# File 'ext/rbczmq/socket.c', line 1042
static VALUE rb_czmq_socket_opt_multicast_hops(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_multicast_hops(sock->socket));
}
|
#multicast_hops=(20) ⇒ nil
1062 1063 1064 1065 1066 |
# File 'ext/rbczmq/socket.c', line 1062
static VALUE rb_czmq_socket_set_opt_multicast_hops(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_multicast_hops, "MULTICAST_HOPS", value);
}
|
#poll(100) ⇒ Boolean
779 780 781 782 783 784 785 786 787 788 789 790 791 |
# File 'ext/rbczmq/socket.c', line 779
static VALUE rb_czmq_socket_poll(VALUE obj, VALUE timeout)
{
bool readable;
struct nogvl_socket_poll_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
Check_Type(timeout, T_FIXNUM);
ZmqSockGuardCrossThread(sock);
args.socket = sock;
args.timeout = FIX2INT(timeout);
readable = (bool)rb_thread_blocking_region(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
return (readable == true) ? Qtrue : Qfalse;
}
|
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default
83 84 85 |
# File 'lib/zmq/socket.rb', line 83 def poll_readable? true end |
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default
88 89 90 |
# File 'lib/zmq/socket.rb', line 88 def poll_writable? true end |
#rate ⇒ Fixnum
925 926 927 928 929 930 |
# File 'ext/rbczmq/socket.c', line 925
static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rate(sock->socket));
}
|
#rate=(50000) ⇒ nil
945 946 947 948 949 |
# File 'ext/rbczmq/socket.c', line 945
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}
|
#rcvbuf ⇒ Fixnum
1237 1238 1239 1240 1241 1242 |
# File 'ext/rbczmq/socket.c', line 1237
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvbuf(sock->socket));
}
|
#rcvbuf=(1000) ⇒ nil
1257 1258 1259 1260 1261 |
# File 'ext/rbczmq/socket.c', line 1257
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}
|
#rcvhwm ⇒ Fixnum
846 847 848 849 850 851 |
# File 'ext/rbczmq/socket.c', line 846
static VALUE rb_czmq_socket_opt_rcvhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvhwm(sock->socket));
}
|
#rcvhwm=(100) ⇒ nil
867 868 869 870 871 |
# File 'ext/rbczmq/socket.c', line 867
static VALUE rb_czmq_socket_set_opt_rcvhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvhwm, "RCVHWM", value);
}
|
#rcvmore ⇒ Boolean
1503 1504 1505 1506 1507 1508 |
# File 'ext/rbczmq/socket.c', line 1503
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#rcvtimeo ⇒ Fixnum
1543 1544 1545 1546 1547 1548 |
# File 'ext/rbczmq/socket.c', line 1543
static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvtimeo(sock->socket));
}
|
#rcvtimeout=(200) ⇒ nil
1563 1564 1565 1566 1567 |
# File 'ext/rbczmq/socket.c', line 1563
static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}
|
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
44 45 46 |
# File 'lib/zmq/socket.rb', line 44 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end |
#bind("inproc: //test") ⇒ Fixnum
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'ext/rbczmq/socket.c', line 219
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_BOUND;
rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
return INT2NUM(rc);
}
|
#connect("tcp: //localhost:3456") ⇒ Boolean
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'ext/rbczmq/socket.c', line 253
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_CONNECTED;
rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
return Qtrue;
}
|
#reconnect_ivl ⇒ Fixnum
1354 1355 1356 1357 1358 1359 |
# File 'ext/rbczmq/socket.c', line 1354
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}
|
#reconnect_ivl=(200) ⇒ nil
1374 1375 1376 1377 1378 |
# File 'ext/rbczmq/socket.c', line 1374
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}
|
#reconnect_ivl_max ⇒ Fixnum
1393 1394 1395 1396 1397 1398 |
# File 'ext/rbczmq/socket.c', line 1393
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}
|
#reconnect_ivl_max=(5) ⇒ nil
1413 1414 1415 1416 1417 |
# File 'ext/rbczmq/socket.c', line 1413
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}
|
#recovery_ivl ⇒ Fixnum
964 965 966 967 968 969 |
# File 'ext/rbczmq/socket.c', line 964
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl(sock->socket));
}
|
#recovery_ivl=(20) ⇒ nil
984 985 986 987 988 |
# File 'ext/rbczmq/socket.c', line 984
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}
|
#recv ⇒ String?
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 |
# File 'ext/rbczmq/socket.c', line 463
static VALUE rb_czmq_socket_recv(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#recv_frame ⇒ ZMQ::Frame?
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 |
# File 'ext/rbczmq/socket.c', line 658
static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
zframe_t *frame = NULL;
struct nogvl_recv_args args;
char print_prefix[255];
char *cur_time = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_frame_nonblock ⇒ ZMQ::Frame?
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 |
# File 'ext/rbczmq/socket.c', line 692
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
zframe_t *frame = NULL;
char print_prefix[255];
char *cur_time = NULL;
errno = 0;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
frame = zframe_recv_nowait(sock->socket);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame_nonblock", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_message ⇒ ZMQ::Message?
738 739 740 741 742 743 744 745 746 747 748 749 750 751 |
# File 'ext/rbczmq/socket.c', line 738
static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
zmsg_t *message = NULL;
struct nogvl_recv_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
}
|
#recv_nonblock ⇒ String?
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'ext/rbczmq/socket.c', line 497
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
char *str = NULL;
errno = 0;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
str = zstr_recv_nowait(sock->socket);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#router_mandatory=(true) ⇒ nil
1141 1142 1143 1144 1145 |
# File 'ext/rbczmq/socket.c', line 1141
static VALUE rb_czmq_socket_set_opt_router_mandatory(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_router_mandatory, "ROUTER_MANDATORY", value);
}
|
#router_raw=(true) ⇒ nil
1160 1161 1162 1163 1164 |
# File 'ext/rbczmq/socket.c', line 1160
static VALUE rb_czmq_socket_set_opt_router_raw(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_router_raw, "ROUTER_RAW", value);
}
|
#send("message") ⇒ Boolean
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 |
# File 'ext/rbczmq/socket.c', line 385
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
return Qtrue;
}
|
#send_frame(frame) ⇒ nil
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 |
# File 'ext/rbczmq/socket.c', line 546
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
struct nogvl_send_frame_args args;
VALUE frame_obj;
VALUE flags;
char print_prefix[255];
char *cur_time = NULL;
zframe_t *print_frame = NULL;
int rc, flgs;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "11", &frame_obj, &flags);
ZmqGetFrame(frame_obj);
if (NIL_P(flags)) {
flgs = 0;
} else {
if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
Check_Type(flags, T_FIXNUM);
flgs = FIX2INT(flags);
}
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
}
args.socket = sock;
args.frame = frame;
args.flags = flgs;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
return Qtrue;
}
|
#send_message(msg) ⇒ nil
613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 |
# File 'ext/rbczmq/socket.c', line 613
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
struct nogvl_send_message_args args;
zmsg_t *print_message = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
ZmqGetMessage(message_obj);
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags |= ZMQ_MESSAGE_DESTROYED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
}
|
#sendm("message") ⇒ Boolean
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 |
# File 'ext/rbczmq/socket.c', line 418
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
return Qtrue;
}
|
#sndbuf ⇒ Fixnum
1198 1199 1200 1201 1202 1203 |
# File 'ext/rbczmq/socket.c', line 1198
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndbuf(sock->socket));
}
|
#sndbuf=(1000) ⇒ nil
1218 1219 1220 1221 1222 |
# File 'ext/rbczmq/socket.c', line 1218
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}
|
#sndhwm ⇒ Fixnum
806 807 808 809 810 811 |
# File 'ext/rbczmq/socket.c', line 806
static VALUE rb_czmq_socket_opt_sndhwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndhwm(sock->socket));
}
|
#sndhwm=(100) ⇒ nil
827 828 829 830 831 |
# File 'ext/rbczmq/socket.c', line 827
static VALUE rb_czmq_socket_set_opt_sndhwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndhwm, "SNDHWM", value);
}
|
#sndtimeo ⇒ Fixnum
1582 1583 1584 1585 1586 1587 |
# File 'ext/rbczmq/socket.c', line 1582
static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndtimeo(sock->socket));
}
|
#sndtimeout=(200) ⇒ nil
1602 1603 1604 1605 1606 |
# File 'ext/rbczmq/socket.c', line 1602
static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}
|
#state ⇒ String
144 145 146 147 148 149 |
# File 'ext/rbczmq/socket.c', line 144
static VALUE rb_czmq_socket_state(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(sock->state);
}
|
#subscribe(value) ⇒ Object
1461 1462 1463 1464 1465 1466 1467 |
# File 'ext/rbczmq/socket.c', line 1461
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#to_s ⇒ Object
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/zmq/socket.rb', line 71 def to_s case state when BOUND "#{type_str} socket bound to #{endpoints.join(', ')}" when CONNECTED "#{type_str} socket connected to #{endpoints.join(', ')}" else "#{type_str} socket" end end |
#type_str ⇒ Object
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
62 63 64 |
# File 'lib/zmq/socket.rb', line 62 def type_str self.class.const_get(:TYPE_STR) end |
#unsubscribe(value) ⇒ Object
1482 1483 1484 1485 1486 1487 1488 |
# File 'ext/rbczmq/socket.c', line 1482
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#verbose=(true) ⇒ nil
335 336 337 338 339 340 341 342 343 |
# File 'ext/rbczmq/socket.c', line 335
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
bool vlevel;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
vlevel = (level == Qtrue) ? true : false;
sock->verbose = vlevel;
return Qnil;
}
|
#writable? ⇒ Boolean
Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.writable? => true
53 54 55 |
# File 'lib/zmq/socket.rb', line 53 def writable? (events & ZMQ::POLLOUT) == ZMQ::POLLOUT end |
#xpub_verbose=(true) ⇒ nil
1179 1180 1181 1182 1183 |
# File 'ext/rbczmq/socket.c', line 1179
static VALUE rb_czmq_socket_set_opt_xpub_verbose(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_xpub_verbose, "XPUB_VERBOSE", value);
}
|