Class: ZMQ::Socket
- Inherits:
-
Object
- Object
- ZMQ::Socket
- Defined in:
- lib/zmq/socket.rb,
ext/rbczmq/socket.c
Defined Under Namespace
Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub
Constant Summary collapse
- PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
- PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
- BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
- CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
Class Method Summary collapse
Instance Method Summary collapse
-
#affinity ⇒ Fixnum
Returns the socket AFFINITY value.
-
#affinity=(1) ⇒ nil
Sets the socket AFFINITY value.
-
#backlog ⇒ Fixnum
Returns the socket BACKLOG value.
-
#backlog=(200) ⇒ nil
Sets the socket BACKLOG value.
-
#bind(uri) ⇒ Object
Binds to a given endpoint.
-
#close ⇒ nil
Closes a socket.
-
#connect(uri) ⇒ Object
Connects to a given endpoint.
-
#endpoint ⇒ String?
Returns the endpoint this socket is currently connected to, if any.
-
#events ⇒ Fixnum
Query if this socket is in a readable or writable state.
-
#fd ⇒ Fixnum
(also: #to_i)
Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor.
-
#hwm ⇒ Fixnum
Returns the socket HWM (High Water Mark) value.
-
#hwm=(100) ⇒ nil
Sets the socket HWM (High Water Mark() value.
-
#identity=(value) ⇒ Object
Sets the socket IDENTITY value.
-
#linger ⇒ Fixnum
Returns the socket LINGER value.
-
#linger=(1000) ⇒ nil
Sets the socket LINGER value in ms.
-
#mcast_loop=(false) ⇒ nil
Sets the socket MCAST_LOOP value.
-
#mcast_loop? ⇒ Boolean
Returns the socket MCAST_LOOP status.
-
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default.
-
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default.
-
#rate ⇒ Fixnum
Returns the socket RATE value.
-
#rate=(50000) ⇒ nil
Sets the socket RATE value.
-
#rcvbuf ⇒ Fixnum
Returns the socket RCVBUF value.
-
#rcvbuf=(1000) ⇒ nil
Sets the socket RCVBUF value.
-
#rcvmore ⇒ Boolean
Query if there’s more messages to receive.
-
#rcvtimeo ⇒ Fixnum
Returns the socket RCVTIMEO value.
-
#rcvtimeout=(200) ⇒ nil
Sets the socket RCVTIMEO value.
-
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket.
-
#bind("inproc: //test") ⇒ Fixnum
Binds to a given endpoint.
-
#connect("tcp: //localhost:3456") ⇒ Boolean
Attempts to connect to a given endpoint.
-
#reconnect_ivl ⇒ Fixnum
Returns the socket RECONNECT_IVL value.
-
#reconnect_ivl=(200) ⇒ nil
Sets the socket RECONNECT_IVL value.
-
#reconnect_ivl_max ⇒ Fixnum
Returns the socket RECONNECT_IVL_MAX value.
-
#reconnect_ivl_max=(5) ⇒ nil
Sets the socket RECONNECT_IVL_MAX value.
-
#recovery_ivl ⇒ Fixnum
Returns the socket RECOVERY_IVL value.
-
#recovery_ivl=(20) ⇒ nil
Sets the socket RECOVERY_IVL value.
-
#recovery_ivl_msec ⇒ Fixnum
Returns the socket RECOVERY_IVL_MSEC value.
-
#recovery_ivl_msec=(20) ⇒ nil
Sets the socket RECOVERY_IVL_MSEC value.
-
#recv ⇒ String?
Receive a string from this ZMQ socket.
-
#recv_frame ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_frame_nonblock ⇒ ZMQ::Frame?
Receives a ZMQ frame from this socket.
-
#recv_message ⇒ ZMQ::Message?
Receives a ZMQ message from this socket.
-
#recv_nonblock ⇒ String?
Receive a string from this ZMQ socket.
-
#send("message") ⇒ Boolean
Sends a string to this ZMQ socket.
-
#send_frame(frame) ⇒ nil
Sends a ZMQ::Frame instance to this socket.
-
#send_message(msg) ⇒ nil
Sends a ZMQ::Message instance to this socket.
-
#sendm("message") ⇒ Boolean
Sends a string to this ZMQ socket, with a more flag set.
-
#sndbuf ⇒ Fixnum
Returns the socket SNDBUF value.
-
#sndbuf=(1000) ⇒ nil
Sets the socket SNDBUF value.
-
#sndtimeo ⇒ Fixnum
Returns the socket SNDTIMEO value.
-
#sndtimeout=(200) ⇒ nil
Sets the socket SNDTIMEO value.
-
#state ⇒ String
Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED.
-
#subscribe(value) ⇒ Object
Subscribes this SUB socket to a topic.
-
#swap ⇒ Fixnum
Returns the socket SWAP value.
-
#swap=(100) ⇒ nil
Sets the socket SWAP value.
-
#to_s ⇒ Object
Generates a string representation of the current socket state.
-
#type_str ⇒ Object
Generates a string representation of this socket type.
-
#unsubscribe(value) ⇒ Object
Unsubscribes this SUB socket from a topic.
-
#verbose=(true) ⇒ nil
Let this socket be verbose - dumps a lot of data to stdout for debugging.
-
#writable? ⇒ Boolean
Determines if this socket is in a writable state.
Class Method Details
.handle_fsm_errors(error, *methods) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/zmq/socket.rb', line 14 def self.handle_fsm_errors(error, *methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); super rescue SystemCallError => e raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM raise end evl end end |
.unsupported_api(*methods) ⇒ Object
6 7 8 9 10 11 12 |
# File 'lib/zmq/socket.rb', line 6 def self.unsupported_api(*methods) methods.each do |m| class_eval <<-"evl", __FILE__, __LINE__ def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!"); end evl end end |
Instance Method Details
#affinity ⇒ Fixnum
883 884 885 886 887 888 |
# File 'ext/rbczmq/socket.c', line 883
static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_affinity(sock->socket));
}
|
#affinity=(1) ⇒ nil
903 904 905 906 907 |
# File 'ext/rbczmq/socket.c', line 903
static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}
|
#backlog ⇒ Fixnum
1195 1196 1197 1198 1199 1200 |
# File 'ext/rbczmq/socket.c', line 1195
static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_backlog(sock->socket));
}
|
#backlog=(200) ⇒ nil
1215 1216 1217 1218 1219 |
# File 'ext/rbczmq/socket.c', line 1215
static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}
|
#bind(uri) ⇒ Object
Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”
socket.bind “collector.domain.com” # resolves 10.0.0.2:9000
87 88 89 90 |
# File 'lib/zmq/socket.rb', line 87 def bind(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_bind(uri) end |
#close ⇒ nil
Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.
Examples
sock.close => nil
85 86 87 88 89 90 91 92 93 94 95 |
# File 'ext/rbczmq/socket.c', line 85
static VALUE rb_czmq_socket_close(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
/* This is useless for production / real use cases as we can't query the state again OR assume
anything about the underlying connection. Merely doing the right thing. */
sock->state = ZMQ_SOCKET_PENDING;
rb_czmq_free_sock(sock);
return Qnil;
}
|
#connect(uri) ⇒ Object
Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.
socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”
socket.connect “collector.domain.com” # resolves 10.0.0.2:9000
99 100 101 102 |
# File 'lib/zmq/socket.rb', line 99 def connect(uri) uri = resolve(uri) if uri && uri !~ PROTO_REXP real_connect(uri) end |
#endpoint ⇒ String?
112 113 114 115 116 117 |
# File 'ext/rbczmq/socket.c', line 112
static VALUE rb_czmq_socket_endpoint(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return sock->endpoint;
}
|
#events ⇒ Fixnum
1403 1404 1405 1406 1407 1408 |
# File 'ext/rbczmq/socket.c', line 1403
static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_events(sock->socket));
}
|
#fd ⇒ Fixnum Also known as: to_i
157 158 159 160 161 162 163 |
# File 'ext/rbczmq/socket.c', line 157
static VALUE rb_czmq_socket_fd(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1);
return INT2NUM(zsocket_fd(sock->socket));
}
|
#hwm ⇒ Fixnum
804 805 806 807 808 809 |
# File 'ext/rbczmq/socket.c', line 804
static VALUE rb_czmq_socket_opt_hwm(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_hwm(sock->socket));
}
|
#hwm=(100) ⇒ nil
825 826 827 828 829 |
# File 'ext/rbczmq/socket.c', line 825
static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_hwm, "HWM", value);
}
|
#identity=(value) ⇒ Object
1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 |
# File 'ext/rbczmq/socket.c', line 1312
static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
char *val;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(value, T_STRING);
if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
val = StringValueCStr(value);
zsocket_set_identity(sock->socket, val);
if (sock->verbose)
zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
return Qnil;
}
|
#linger ⇒ Fixnum
1156 1157 1158 1159 1160 1161 |
# File 'ext/rbczmq/socket.c', line 1156
static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_linger(sock->socket));
}
|
#linger=(1000) ⇒ nil
1176 1177 1178 1179 1180 |
# File 'ext/rbczmq/socket.c', line 1176
static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}
|
#mcast_loop=(false) ⇒ nil
1059 1060 1061 1062 1063 |
# File 'ext/rbczmq/socket.c', line 1059
static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetBooleanSockOpt(obj, zsocket_set_mcast_loop, "MCAST_LOOP", value);
}
|
#mcast_loop? ⇒ Boolean
1039 1040 1041 1042 1043 1044 |
# File 'ext/rbczmq/socket.c', line 1039
static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#poll_readable? ⇒ Boolean
Poll all sockets for readbable states by default
71 72 73 |
# File 'lib/zmq/socket.rb', line 71 def poll_readable? true end |
#poll_writable? ⇒ Boolean
Poll all sockets for writable states by default
76 77 78 |
# File 'lib/zmq/socket.rb', line 76 def poll_writable? true end |
#rate ⇒ Fixnum
922 923 924 925 926 927 |
# File 'ext/rbczmq/socket.c', line 922
static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rate(sock->socket));
}
|
#rate=(50000) ⇒ nil
942 943 944 945 946 |
# File 'ext/rbczmq/socket.c', line 942
static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}
|
#rcvbuf ⇒ Fixnum
1117 1118 1119 1120 1121 1122 |
# File 'ext/rbczmq/socket.c', line 1117
static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvbuf(sock->socket));
}
|
#rcvbuf=(1000) ⇒ nil
1137 1138 1139 1140 1141 |
# File 'ext/rbczmq/socket.c', line 1137
static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}
|
#rcvmore ⇒ Boolean
1383 1384 1385 1386 1387 1388 |
# File 'ext/rbczmq/socket.c', line 1383
static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}
|
#rcvtimeo ⇒ Fixnum
1423 1424 1425 1426 1427 1428 |
# File 'ext/rbczmq/socket.c', line 1423
static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_rcvtimeo(sock->socket));
}
|
#rcvtimeout=(200) ⇒ nil
1443 1444 1445 1446 1447 |
# File 'ext/rbczmq/socket.c', line 1443
static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}
|
#readable? ⇒ Boolean
Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.
socket.readable? => true
32 33 34 |
# File 'lib/zmq/socket.rb', line 32 def readable? (events & ZMQ::POLLIN) == ZMQ::POLLIN end |
#bind("inproc: //test") ⇒ Fixnum
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'ext/rbczmq/socket.c', line 209
static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_BOUND;
sock->endpoint = rb_str_new4(endpoint);
return INT2NUM(rc);
}
|
#connect("tcp: //localhost:3456") ⇒ Boolean
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
# File 'ext/rbczmq/socket.c', line 243
static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
struct nogvl_conn_args args;
int rc;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqSockGuardCrossThread(sock);
Check_Type(endpoint, T_STRING);
args.socket = sock;
args.endpoint = StringValueCStr(endpoint);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
sock->state = ZMQ_SOCKET_CONNECTED;
sock->endpoint = rb_str_new4(endpoint);
return Qtrue;
}
|
#reconnect_ivl ⇒ Fixnum
1234 1235 1236 1237 1238 1239 |
# File 'ext/rbczmq/socket.c', line 1234
static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}
|
#reconnect_ivl=(200) ⇒ nil
1254 1255 1256 1257 1258 |
# File 'ext/rbczmq/socket.c', line 1254
static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}
|
#reconnect_ivl_max ⇒ Fixnum
1273 1274 1275 1276 1277 1278 |
# File 'ext/rbczmq/socket.c', line 1273
static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}
|
#reconnect_ivl_max=(5) ⇒ nil
1293 1294 1295 1296 1297 |
# File 'ext/rbczmq/socket.c', line 1293
static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}
|
#recovery_ivl ⇒ Fixnum
961 962 963 964 965 966 |
# File 'ext/rbczmq/socket.c', line 961
static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl(sock->socket));
}
|
#recovery_ivl=(20) ⇒ nil
981 982 983 984 985 |
# File 'ext/rbczmq/socket.c', line 981
static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}
|
#recovery_ivl_msec ⇒ Fixnum
1000 1001 1002 1003 1004 1005 |
# File 'ext/rbczmq/socket.c', line 1000
static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_recovery_ivl_msec(sock->socket));
}
|
#recovery_ivl_msec=(20) ⇒ nil
1020 1021 1022 1023 1024 |
# File 'ext/rbczmq/socket.c', line 1020
static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value);
}
|
#recv ⇒ String?
446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
# File 'ext/rbczmq/socket.c', line 446
static VALUE rb_czmq_socket_recv(VALUE obj)
{
char *str = NULL;
struct nogvl_recv_args args;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#recv_frame ⇒ ZMQ::Frame?
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 |
# File 'ext/rbczmq/socket.c', line 681
static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
zframe_t *frame = NULL;
struct nogvl_recv_args args;
char print_prefix[255];
char *cur_time = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_frame_nonblock ⇒ ZMQ::Frame?
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 |
# File 'ext/rbczmq/socket.c', line 715
static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
zframe_t *frame = NULL;
char print_prefix[255];
char *cur_time = NULL;
errno = 0;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
frame = zframe_recv_nowait(sock->socket);
if (frame == NULL) return Qnil;
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
ZmqDumpFrame("recv_frame_nonblock", frame);
}
return rb_czmq_alloc_frame(frame);
}
|
#recv_message ⇒ ZMQ::Message?
776 777 778 779 780 781 782 783 784 785 786 787 788 789 |
# File 'ext/rbczmq/socket.c', line 776
static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
zmsg_t *message = NULL;
struct nogvl_recv_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
args.socket = sock;
message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
if (message == NULL) return Qnil;
if (sock->verbose) ZmqDumpMessage("recv_message", message);
return rb_czmq_alloc_message(message);
}
|
#recv_nonblock ⇒ String?
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 |
# File 'ext/rbczmq/socket.c', line 480
static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
char *str = NULL;
errno = 0;
VALUE result = Qnil;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
str = zstr_recv_nowait(sock->socket);
if (str == NULL) return result;
ZmqAssertSysError();
if (sock->verbose)
zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
result = ZmqEncode(rb_str_new2(str));
free(str);
return result;
}
|
#send("message") ⇒ Boolean
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'ext/rbczmq/socket.c', line 351
static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
return Qtrue;
}
|
#send_frame(frame) ⇒ nil
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
# File 'ext/rbczmq/socket.c', line 540
static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
struct nogvl_send_frame_args args;
VALUE frame_obj;
VALUE flags;
char print_prefix[255];
char *cur_time = NULL;
zframe_t *print_frame = NULL;
int rc, flgs;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
rb_scan_args(argc, argv, "11", &frame_obj, &flags);
ZmqGetFrame(frame_obj);
if (NIL_P(flags)) {
flgs = 0;
} else {
if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
Check_Type(flags, T_FIXNUM);
flgs = FIX2INT(flags);
}
if (sock->verbose) {
cur_time = rb_czmq_formatted_current_time();
print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
}
args.socket = sock;
args.frame = frame;
args.flags = flgs;
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
return Qtrue;
}
|
#send_message(msg) ⇒ nil
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 |
# File 'ext/rbczmq/socket.c', line 621
static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
struct nogvl_send_message_args args;
zmsg_t *print_message = NULL;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
ZmqGetMessage(message_obj);
if (sock->verbose) print_message = zmsg_dup(message->message);
args.socket = sock;
args.message = message->message;
rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
message->flags |= ZMQ_MESSAGE_DESTROYED;
if (sock->verbose) ZmqDumpMessage("send_message", print_message);
return Qnil;
}
|
#sendm("message") ⇒ Boolean
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'ext/rbczmq/socket.c', line 384
static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
int rc;
struct nogvl_send_args args;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
ZmqSockGuardCrossThread(sock);
Check_Type(msg, T_STRING);
args.socket = sock;
args.msg = StringValueCStr(msg);
rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
ZmqAssert(rc);
if (sock->verbose)
zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
return Qtrue;
}
|
#sndbuf ⇒ Fixnum
1078 1079 1080 1081 1082 1083 |
# File 'ext/rbczmq/socket.c', line 1078
static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndbuf(sock->socket));
}
|
#sndbuf=(1000) ⇒ nil
1098 1099 1100 1101 1102 |
# File 'ext/rbczmq/socket.c', line 1098
static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}
|
#sndtimeo ⇒ Fixnum
1462 1463 1464 1465 1466 1467 |
# File 'ext/rbczmq/socket.c', line 1462
static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_sndtimeo(sock->socket));
}
|
#sndtimeout=(200) ⇒ nil
1482 1483 1484 1485 1486 |
# File 'ext/rbczmq/socket.c', line 1482
static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}
|
#state ⇒ String
134 135 136 137 138 139 |
# File 'ext/rbczmq/socket.c', line 134
static VALUE rb_czmq_socket_state(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(sock->state);
}
|
#subscribe(value) ⇒ Object
1341 1342 1343 1344 1345 1346 1347 |
# File 'ext/rbczmq/socket.c', line 1341
static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#swap ⇒ Fixnum
844 845 846 847 848 849 |
# File 'ext/rbczmq/socket.c', line 844
static VALUE rb_czmq_socket_opt_swap(VALUE obj)
{
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
return INT2NUM(zsocket_swap(sock->socket));
}
|
#swap=(100) ⇒ nil
864 865 866 867 868 |
# File 'ext/rbczmq/socket.c', line 864
static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetSockOpt(obj, zsocket_set_swap, "SWAP", value);
}
|
#to_s ⇒ Object
Generates a string representation of the current socket state
socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”
59 60 61 62 63 64 65 66 67 68 |
# File 'lib/zmq/socket.rb', line 59 def to_s case state when BOUND "#{type_str} socket bound to #{endpoint}" when CONNECTED "#{type_str} socket connected to #{endpoint}" else "#{type_str} socket" end end |
#type_str ⇒ Object
Generates a string representation of this socket type
socket = ctx.socket(:PUB) socket.type_str => “PUB”
50 51 52 |
# File 'lib/zmq/socket.rb', line 50 def type_str self.class.const_get(:TYPE_STR) end |
#unsubscribe(value) ⇒ Object
1362 1363 1364 1365 1366 1367 1368 |
# File 'ext/rbczmq/socket.c', line 1362
static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
zmq_sock_wrapper *sock = NULL;
ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
ZmqAssertSockOptFor(ZMQ_SUB)
});
}
|
#verbose=(true) ⇒ nil
275 276 277 278 279 280 281 282 283 |
# File 'ext/rbczmq/socket.c', line 275
static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
Bool vlevel;
zmq_sock_wrapper *sock = NULL;
GetZmqSocket(obj);
vlevel = (level == Qtrue) ? TRUE : FALSE;
sock->verbose = vlevel;
return Qnil;
}
|