Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/zmq/socket.rb,
ext/rbczmq/socket.c
Defined Under Namespace
Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub
Constant Summary collapse
- PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
- BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
- CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
Class Method Summary collapse
Instance Method Summary collapse
-
#affinity ⇒ Fixnum
Returns the socket AFFINITY value.
-
#affinity=(1) ⇒ nil
Sets the socket AFFINITY value.
-
#backlog ⇒ Fixnum
Returns the socket BACKLOG value.
-
#backlog=(200) ⇒ nil
Sets the socket BACKLOG value.
-
#bind("inproc: //test") ⇒ Fixnum
Binds to a given endpoint.
-
#close ⇒ nil
Closes a socket.
-
#connect("tcp: //localhost:3456") ⇒ Boolean
Attempts to connect to a given endpoint.
-
#endpoint ⇒ String?
Returns the endpoint this socket is currently connected to, if any.
-
#events ⇒ Fixnum
Query if this socket is in a readable or writable state.
-
#fd ⇒ Fixnum
(also: #to_i)
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor.
-
#hwm ⇒ Fixnum
Returns the socket HWM (High Water Mark) value.
-
#hwm=(100) ⇒ nil
Sets the socket HWM (High Water Mark() value.
-
#identity=(value) ⇒ Object
Sets the socket IDENTITY value.
-
#linger ⇒ Fixnum
Returns the socket LINGER value.
-
#linger=(1000) ⇒ nil
Sets the socket LINGER value in ms.
-
#mcast_loop=(false) ⇒ nil
Sets the socket MCAST_LOOP value.
-
#mcast_loop? ⇒ Boolean
Returns the socket MCAST_LOOP status.
-
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default.
-
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default.
-
#rate ⇒ Fixnum
Returns the socket RATE value.
-
#rate=(50000) ⇒ nil
Sets the socket RATE value.
-
#rcvbuf ⇒ Fixnum
Returns the socket RCVBUF value.
-
#rcvbuf=(1000) ⇒ nil
Sets the socket RCVBUF value.
-
#rcvmore ⇒ Boolean
Query if there’s more messages to receive.
-
#rcvtimeo ⇒ Fixnum
Returns the socket RCVTIMEO value.
-
#rcvtimeout=(200) ⇒ nil
Sets the socket RCVTIMEO value.
-
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket.
-
#reconnect_ivl ⇒ Fixnum
Returns the socket RECONNECT_IVL value.
-
#reconnect_ivl=(200) ⇒ nil
Sets the socket RECONNECT_IVL value.
-
#reconnect_ivl_max ⇒ Fixnum
Returns the socket RECONNECT_IVL_MAX value.
-
#reconnect_ivl_max=(5) ⇒ nil
Sets the socket RECONNECT_IVL_MAX value.
-
#recovery_ivl ⇒ Fixnum
Returns the socket RECOVERY_IVL value.
-
#recovery_ivl=(20) ⇒ nil
Sets the socket RECOVERY_IVL value.
-
#recovery_ivl_msec ⇒ Fixnum
Returns the socket RECOVERY_IVL_MSEC value.
-
#recovery_ivl_msec=(20) ⇒ nil
Sets the socket RECOVERY_IVL_MSEC value.
-
#recv ⇒ String?
Receive a string from this ZMQ socket.
-
#recv_frame ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_frame_nonblock ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_message ⇒ ZMQ::Message?
Receives a ZMQ message from this socket.
-
#recv_nonblock ⇒ String?
Receive a string from this ZMQ socket.
-
#recv_timeout ⇒ Fixnum?
Returns the recv timeout currently associated with this socket.
-
#recv_timeout=(5) ⇒ nil
Sets a receive timeout for this socket.
-
#send("message") ⇒ Boolean
Sends a string to this ZMQ socket.
-
#send_frame(frame) ⇒ nil
Sends a ZMQ::Frame instance to this socket.
-
#send_message(msg) ⇒ nil
Sends a ZMQ::Message instance to this socket.
-
#send_timeout ⇒ Fixnum?
Returns the send timeout currently associated with this socket.
-
#send_timeout=(5) ⇒ nil
Sets a send timeout for this socket.
-
#sendm("message") ⇒ Boolean
Sends a string to this ZMQ socket, with a more flag set.
-
#sndbuf ⇒ Fixnum
Returns the socket SNDBUF value.
-
#sndbuf=(1000) ⇒ nil
Sets the socket SNDBUF value.
-
#sndtimeo ⇒ Fixnum
Returns the socket SNDTIMEO value.
-
#sndtimeout=(200) ⇒ nil
Sets the socket SNDTIMEO value.
-
#state ⇒ String
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED.
-
#subscribe(value) ⇒ Object
Subscribes this SUB socket to a topic.
-
#swap ⇒ Fixnum
Returns the socket SWAP value.
-
#swap=(100) ⇒ nil
Sets the socket SWAP value.
-
#to_s ⇒ Object
Generates a string representation of the current socket state.
-
#type_str ⇒ Object
Generates a string representation of this socket type.
-
#unsubscribe(value) ⇒ Object
Unsubscribes this SUB socket from a topic.
-
#verbose=(true) ⇒ nil
Let this socket be verbose - dumps a lot of data to stdout for debugging.
-
#writable? ⇒ Boolean
Determines if this socket is in a writable state.
Class Method Details
.handle_fsm_errors(error, *methods) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/zmq/socket.rb', line 12 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end evl end end |
.unsupported_api(*methods) ⇒ Object
4 5 6 7 8 9 10 |
# File 'lib/zmq/socket.rb', line 4 def self.unsupported_api(*methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!"); end evl end end |
Instance Method Details
#affinity ⇒ Fixnum
973 974 975 976 977 978 |
# File 'ext/rbczmq/socket.c', line 973
static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_affinity(sock->socket));
}
|
#affinity=(1) ⇒ nil
993 994 995 996 997 |
# File 'ext/rbczmq/socket.c', line 993
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}
|
#backlog ⇒ Fixnum
1285 1286 1287 1288 1289 1290 |
# File 'ext/rbczmq/socket.c', line 1285
static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_backlog(sock->socket));
}
|
#backlog=(200) ⇒ nil
1305 1306 1307 1308 1309 |
# File 'ext/rbczmq/socket.c', line 1305
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}
|
#bind("inproc: //test") ⇒ Fixnum
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'ext/rbczmq/socket.c', line 219
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
if (rc == -1) ZmqRaiseSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_BOUND;
sock->endpoint = rb_str_new4(endpoint);
return INT2NUM(rc);
}
|
#close ⇒ nil
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
Examples
sock.close => nil
87 88 89 90 91 92 93 94 95 96 97 |
# File 'ext/rbczmq/socket.c', line 87
static VALUE rb_czmq_socket_close(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
/* This is useless for production / real use cases as we can't query the state again OR assume
anything about the underlying connection. Merely doing the right thing. */
sock->state = ZMQ_SOCKET_PENDING;
rb_czmq_free_sock(sock);
return Qnil;
}
|
#connect("tcp: //localhost:3456") ⇒ Boolean
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'ext/rbczmq/socket.c', line 253
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_CONNECTED;
sock->endpoint = rb_str_new4(endpoint);
return Qtrue;
}
|
#endpoint ⇒ String?
114 115 116 117 118 119 |
# File 'ext/rbczmq/socket.c', line 114
static VALUE rb_czmq_socket_endpoint(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->endpoint;
}
|
#events ⇒ Fixnum
1493 1494 1495 1496 1497 1498 |
# File 'ext/rbczmq/socket.c', line 1493
static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_events(sock->socket));
}
|
#fd ⇒ Fixnum Also known as: to_i
159 160 161 162 163 164 165 |
# File 'ext/rbczmq/socket.c', line 159
static VALUE rb_czmq_socket_fd(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1);
return INT2NUM(zsocket_fd(sock->socket));
}
|
#hwm ⇒ Fixnum
894 895 896 897 898 899 |
# File 'ext/rbczmq/socket.c', line 894
static VALUE rb_czmq_socket_opt_hwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_hwm(sock->socket));
}
|
#hwm=(100) ⇒ nil
915 916 917 918 919 |
# File 'ext/rbczmq/socket.c', line 915
static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_hwm, "HWM", value);
}
|
#identity=(value) ⇒ Object
1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 |
# File 'ext/rbczmq/socket.c', line 1402
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
char *val;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(value, T_STRING);
if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
val = StringValueCStr(value);
zsocket_set_identity(sock->socket, val);
if (sock->verbose)
zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
return Qnil;
}
|
#linger ⇒ Fixnum
1246 1247 1248 1249 1250 1251 |
# File 'ext/rbczmq/socket.c', line 1246
static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_linger(sock->socket));
}
|
#linger=(1000) ⇒ nil
1266 1267 1268 1269 1270 |
# File 'ext/rbczmq/socket.c', line 1266
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}
|
#mcast_loop=(false) ⇒ nil
1149 1150 1151 1152 1153 |
# File 'ext/rbczmq/socket.c', line 1149
static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_mcast_loop, "MCAST_LOOP", value);
}
|
#mcast_loop? ⇒ Boolean
1129 1130 1131 1132 1133 1134 |
# File 'ext/rbczmq/socket.c', line 1129
static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default
69 70 71 |
# File 'lib/zmq/socket.rb', line 69 def poll_readable? true end |
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default
74 75 76 |
# File 'lib/zmq/socket.rb', line 74 def poll_writable? true end |
#rate ⇒ Fixnum
1012 1013 1014 1015 1016 1017 |
# File 'ext/rbczmq/socket.c', line 1012
static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rate(sock->socket));
}
|
#rate=(50000) ⇒ nil
1032 1033 1034 1035 1036 |
# File 'ext/rbczmq/socket.c', line 1032
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}
|
#rcvbuf ⇒ Fixnum
1207 1208 1209 1210 1211 1212 |
# File 'ext/rbczmq/socket.c', line 1207
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvbuf(sock->socket));
}
|
#rcvbuf=(1000) ⇒ nil
1227 1228 1229 1230 1231 |
# File 'ext/rbczmq/socket.c', line 1227
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}
|
#rcvmore ⇒ Boolean
1473 1474 1475 1476 1477 1478 |
# File 'ext/rbczmq/socket.c', line 1473
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#rcvtimeo ⇒ Fixnum
1513 1514 1515 1516 1517 1518 |
# File 'ext/rbczmq/socket.c', line 1513
static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvtimeo(sock->socket));
}
|
#rcvtimeout=(200) ⇒ nil
1533 1534 1535 1536 1537 |
# File 'ext/rbczmq/socket.c', line 1533
static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}
|
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
30 31 32 |
# File 'lib/zmq/socket.rb', line 30 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end |
#reconnect_ivl ⇒ Fixnum
1324 1325 1326 1327 1328 1329 |
# File 'ext/rbczmq/socket.c', line 1324
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}
|
#reconnect_ivl=(200) ⇒ nil
1344 1345 1346 1347 1348 |
# File 'ext/rbczmq/socket.c', line 1344
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}
|
#reconnect_ivl_max ⇒ Fixnum
1363 1364 1365 1366 1367 1368 |
# File 'ext/rbczmq/socket.c', line 1363
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}
|
#reconnect_ivl_max=(5) ⇒ nil
1383 1384 1385 1386 1387 |
# File 'ext/rbczmq/socket.c', line 1383
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}
|
#recovery_ivl ⇒ Fixnum
1051 1052 1053 1054 1055 1056 |
# File 'ext/rbczmq/socket.c', line 1051
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl(sock->socket));
}
|
#recovery_ivl=(20) ⇒ nil
1071 1072 1073 1074 1075 |
# File 'ext/rbczmq/socket.c', line 1071
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}
|
#recovery_ivl_msec ⇒ Fixnum
1090 1091 1092 1093 1094 1095 |
# File 'ext/rbczmq/socket.c', line 1090
static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl_msec(sock->socket));
}
|
#recovery_ivl_msec=(20) ⇒ nil
1110 1111 1112 1113 1114 |
# File 'ext/rbczmq/socket.c', line 1110
static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value);
}
|
#recv ⇒ String?
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 |
# File 'ext/rbczmq/socket.c', line 450
static VALUE rb_czmq_socket_recv(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#recv_frame ⇒ ZMQ::Frame?
685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 |
# File 'ext/rbczmq/socket.c', line 685
static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
zframe_t *frame = NULL;
struct nogvl_recv_args args;
char print_prefix[255];
char *cur_time = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_frame_nonblock ⇒ ZMQ::Frame?
719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 |
# File 'ext/rbczmq/socket.c', line 719
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
zframe_t *frame = NULL;
char print_prefix[255];
char *cur_time = NULL;
errno = 0;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
frame = zframe_recv_nowait(sock->socket);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame_nonblock", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_message ⇒ ZMQ::Message?
780 781 782 783 784 785 786 787 788 789 790 791 792 793 |
# File 'ext/rbczmq/socket.c', line 780
static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
zmsg_t *message = NULL;
struct nogvl_recv_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
}
|
#recv_nonblock ⇒ String?
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 |
# File 'ext/rbczmq/socket.c', line 484
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
char *str = NULL;
errno = 0;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
str = zstr_recv_nowait(sock->socket);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#recv_timeout ⇒ Fixnum?
831 832 833 834 835 836 |
# File 'ext/rbczmq/socket.c', line 831
static VALUE rb_czmq_socket_recv_timeout(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->recv_timeout;
}
|
#recv_timeout=(5) ⇒ nil
808 809 810 811 812 813 814 815 |
# File 'ext/rbczmq/socket.c', line 808
static VALUE rb_czmq_socket_set_recv_timeout(VALUE obj, VALUE timeout)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout)));
sock->recv_timeout = timeout;
return Qnil;
}
|
#send("message") ⇒ Boolean
357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'ext/rbczmq/socket.c', line 357
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
return Qtrue;
}
|
#send_frame(frame) ⇒ nil
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 |
# File 'ext/rbczmq/socket.c', line 544
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
struct nogvl_send_frame_args args;
VALUE frame_obj;
VALUE flags;
char print_prefix[255];
char *cur_time = NULL;
zframe_t *print_frame = NULL;
int rc, flgs;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "11", &frame_obj, &flags);
ZmqGetFrame(frame_obj);
if (NIL_P(flags)) {
flgs = 0;
} else {
if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
Check_Type(flags, T_FIXNUM);
flgs = FIX2INT(flags);
}
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
}
args.socket = sock;
args.frame = frame;
args.flags = flgs;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
return Qtrue;
}
|
#send_message(msg) ⇒ nil
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 |
# File 'ext/rbczmq/socket.c', line 625
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
struct nogvl_send_message_args args;
zmsg_t *print_message = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
ZmqGetMessage(message_obj);
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags |= ZMQ_MESSAGE_DESTROYED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
}
|
#send_timeout ⇒ Fixnum?
874 875 876 877 878 879 |
# File 'ext/rbczmq/socket.c', line 874
static VALUE rb_czmq_socket_send_timeout(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->send_timeout;
}
|
#send_timeout=(5) ⇒ nil
851 852 853 854 855 856 857 858 |
# File 'ext/rbczmq/socket.c', line 851
static VALUE rb_czmq_socket_set_send_timeout(VALUE obj, VALUE timeout)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout)));
sock->send_timeout = timeout;
return Qnil;
}
|
#sendm("message") ⇒ Boolean
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 |
# File 'ext/rbczmq/socket.c', line 390
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
return Qtrue;
}
|
#sndbuf ⇒ Fixnum
1168 1169 1170 1171 1172 1173 |
# File 'ext/rbczmq/socket.c', line 1168
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndbuf(sock->socket));
}
|
#sndbuf=(1000) ⇒ nil
1188 1189 1190 1191 1192 |
# File 'ext/rbczmq/socket.c', line 1188
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}
|
#sndtimeo ⇒ Fixnum
1552 1553 1554 1555 1556 1557 |
# File 'ext/rbczmq/socket.c', line 1552
static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndtimeo(sock->socket));
}
|
#sndtimeout=(200) ⇒ nil
1572 1573 1574 1575 1576 |
# File 'ext/rbczmq/socket.c', line 1572
static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}
|
#state ⇒ String
136 137 138 139 140 141 |
# File 'ext/rbczmq/socket.c', line 136
static VALUE rb_czmq_socket_state(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(sock->state);
}
|
#subscribe(value) ⇒ Object
1431 1432 1433 1434 1435 1436 1437 |
# File 'ext/rbczmq/socket.c', line 1431
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#swap ⇒ Fixnum
934 935 936 937 938 939 |
# File 'ext/rbczmq/socket.c', line 934
static VALUE rb_czmq_socket_opt_swap(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_swap(sock->socket));
}
|
#swap=(100) ⇒ nil
954 955 956 957 958 |
# File 'ext/rbczmq/socket.c', line 954
static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_swap, "SWAP", value);
}
|
#to_s ⇒ Object
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/zmq/socket.rb', line 57 def to_s case state when BOUND "#{type_str} socket bound to #{endpoint}" when CONNECTED "#{type_str} socket connected to #{endpoint}" else "#{type_str} socket" end end |
#type_str ⇒ Object
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
48 49 50 |
# File 'lib/zmq/socket.rb', line 48 def type_str self.class.const_get(:TYPE_STR) end |
#unsubscribe(value) ⇒ Object
1452 1453 1454 1455 1456 1457 1458 |
# File 'ext/rbczmq/socket.c', line 1452
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#verbose=(true) ⇒ nil
285 286 287 288 289 290 291 292 293 |
# File 'ext/rbczmq/socket.c', line 285
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
Bool vlevel;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
vlevel = (level == Qtrue) ? TRUE : FALSE;
sock->verbose = vlevel;
return Qnil;
}
|