Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/socket.rb,
ext/rbczmq/socket.c

Direct Known Subclasses

Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub

Defined Under Namespace

Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub

Constant Summary collapse

PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.handle_fsm_errors(error, *methods) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/zmq/socket.rb', line 14

def self.handle_fsm_errors(error, *methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args);
        super
      rescue SystemCallError => e
        raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM
        raise
      end
    evl
  end
end

.unsupported_api(*methods) ⇒ Object



6
7
8
9
10
11
12
# File 'lib/zmq/socket.rb', line 6

def self.unsupported_api(*methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!");  end
    evl
  end
end

Instance Method Details

#affinityFixnum

Returns the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity  =>  0

Returns:

  • (Fixnum)


883
884
885
886
887
888
# File 'ext/rbczmq/socket.c', line 883

static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_affinity(sock->socket));
}

#affinity=(1) ⇒ nil

Sets the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity = 1  =>  nil

Returns:

  • (nil)


903
904
905
906
907
# File 'ext/rbczmq/socket.c', line 903

static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}

#backlogFixnum

Returns the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog  =>  100

Returns:

  • (Fixnum)


1195
1196
1197
1198
1199
1200
# File 'ext/rbczmq/socket.c', line 1195

static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_backlog(sock->socket));
}

#backlog=(200) ⇒ nil

Sets the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog = 200  =>  nil

Returns:

  • (nil)


1215
1216
1217
1218
1219
# File 'ext/rbczmq/socket.c', line 1215

static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}

#bind(uri) ⇒ Object

Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”

socket.bind “collector.domain.com” # resolves 10.0.0.2:9000



87
88
89
90
# File 'lib/zmq/socket.rb', line 87

def bind(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_bind(uri)
end

#closenil

Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

sock.close    =>  nil

Returns:

  • (nil)


85
86
87
88
89
90
91
92
93
94
95
# File 'ext/rbczmq/socket.c', line 85

static VALUE rb_czmq_socket_close(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    /* This is useless for production / real use cases as we can't query the state again OR assume
    anything about the underlying connection. Merely doing the right thing. */
    sock->state = ZMQ_SOCKET_PENDING;
    rb_czmq_free_sock(sock);
    return Qnil;
}

#connect(uri) ⇒ Object

Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”

socket.connect “collector.domain.com” # resolves 10.0.0.2:9000



99
100
101
102
# File 'lib/zmq/socket.rb', line 99

def connect(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_connect(uri)
end

#endpointString?

Returns the endpoint this socket is currently connected to, if any.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoint    =>   nil
sock.bind("inproc://test")
sock.endpoint    =>  "inproc://test"

Returns:

  • (String, nil)


112
113
114
115
116
117
# File 'ext/rbczmq/socket.c', line 112

static VALUE rb_czmq_socket_endpoint(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return sock->endpoint;
}

#eventsFixnum

Query if this socket is in a readable or writable state.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.events => ZMQ::POLLIN

Returns:

  • (Fixnum)


1403
1404
1405
1406
1407
1408
# File 'ext/rbczmq/socket.c', line 1403

static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_events(sock->socket));
}

#fdFixnum Also known as: to_i

Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor. Edge-triggered notification of I/O state changes.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.fd       =>   -1
sock.bind("inproc://test")
sock.fd       =>   4

Returns:

  • (Fixnum)


157
158
159
160
161
162
163
# File 'ext/rbczmq/socket.c', line 157

static VALUE rb_czmq_socket_fd(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1);
    return INT2NUM(zsocket_fd(sock->socket));
}

#hwmFixnum

Returns the socket HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.hwm  =>  0

Returns:

  • (Fixnum)


804
805
806
807
808
809
# File 'ext/rbczmq/socket.c', line 804

static VALUE rb_czmq_socket_opt_hwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_hwm(sock->socket));
}

#hwm=(100) ⇒ nil

Sets the socket HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.hwm = 100  =>  nil
sock.hwm  =>  100

Returns:

  • (nil)


825
826
827
828
829
# File 'ext/rbczmq/socket.c', line 825

static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_hwm, "HWM", value);
}

#identity=(value) ⇒ Object

Sets the socket IDENTITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.identity = "anonymous"  =>  nil


1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
# File 'ext/rbczmq/socket.c', line 1312

static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
    char *val;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(value, T_STRING);
    if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
    if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
    val = StringValueCStr(value);
    zsocket_set_identity(sock->socket, val);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
    return Qnil;
}

#lingerFixnum

Returns the socket LINGER value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger  =>  -1

Returns:

  • (Fixnum)


1156
1157
1158
1159
1160
1161
# File 'ext/rbczmq/socket.c', line 1156

static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_linger(sock->socket));
}

#linger=(1000) ⇒ nil

Sets the socket LINGER value in ms.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger = 1000  =>  nil

Returns:

  • (nil)


1176
1177
1178
1179
1180
# File 'ext/rbczmq/socket.c', line 1176

static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}

#mcast_loop=(false) ⇒ nil

Sets the socket MCAST_LOOP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.mcast_loop = false  =>  nil

Returns:

  • (nil)


1059
1060
1061
1062
1063
# File 'ext/rbczmq/socket.c', line 1059

static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_mcast_loop, "MCAST_LOOP", value);
}

#mcast_loop?Boolean

Returns the socket MCAST_LOOP status.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.mcast_loop?  =>  true

Returns:

  • (Boolean)


1039
1040
1041
1042
1043
1044
# File 'ext/rbczmq/socket.c', line 1039

static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse;
}

#poll_readable?Boolean

Poll all sockets for readbable states by default

Returns:

  • (Boolean)


71
72
73
# File 'lib/zmq/socket.rb', line 71

def poll_readable?
  true
end

#poll_writable?Boolean

Poll all sockets for writable states by default

Returns:

  • (Boolean)


76
77
78
# File 'lib/zmq/socket.rb', line 76

def poll_writable?
  true
end

#rateFixnum

Returns the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate  =>  40000

Returns:

  • (Fixnum)


922
923
924
925
926
927
# File 'ext/rbczmq/socket.c', line 922

static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rate(sock->socket));
}

#rate=(50000) ⇒ nil

Sets the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate = 50000  =>  nil

Returns:

  • (nil)


942
943
944
945
946
# File 'ext/rbczmq/socket.c', line 942

static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}

#rcvbufFixnum

Returns the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf  =>  0

Returns:

  • (Fixnum)


1117
1118
1119
1120
1121
1122
# File 'ext/rbczmq/socket.c', line 1117

static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvbuf(sock->socket));
}

#rcvbuf=(1000) ⇒ nil

Sets the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf = 1000  =>  nil

Returns:

  • (nil)


1137
1138
1139
1140
1141
# File 'ext/rbczmq/socket.c', line 1137

static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}

#rcvmoreBoolean

Query if there’s more messages to receive.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.rcvmore =>  true

Returns:

  • (Boolean)


1383
1384
1385
1386
1387
1388
# File 'ext/rbczmq/socket.c', line 1383

static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}

#rcvtimeoFixnum

Returns the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo  =>  -1

Returns:

  • (Fixnum)


1423
1424
1425
1426
1427
1428
# File 'ext/rbczmq/socket.c', line 1423

static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvtimeo(sock->socket));
}

#rcvtimeout=(200) ⇒ nil

Sets the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo = 200  =>  nil

Returns:

  • (nil)


1443
1444
1445
1446
1447
# File 'ext/rbczmq/socket.c', line 1443

static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}

#readable?Boolean

Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.readable? => true

Returns:

  • (Boolean)


32
33
34
# File 'lib/zmq/socket.rb', line 32

def readable?
  (events & ZMQ::POLLIN) == ZMQ::POLLIN
end

#bind("inproc: //test") ⇒ Fixnum

Binds to a given endpoint. When the port number is ‘*’, attempts to bind to a free port. Always returns the port number on success.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.bind("tcp://localhost:*")    =>  5432

Returns:

  • (Fixnum)


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'ext/rbczmq/socket.c', line 209

static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_BOUND;
    sock->endpoint = rb_str_new4(endpoint);
    return INT2NUM(rc);
}

#connect("tcp: //localhost:3456") ⇒ Boolean

Attempts to connect to a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'ext/rbczmq/socket.c', line 243

static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_CONNECTED;
    sock->endpoint = rb_str_new4(endpoint);
    return Qtrue;
}

#reconnect_ivlFixnum

Returns the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl  =>  100

Returns:

  • (Fixnum)


1234
1235
1236
1237
1238
1239
# File 'ext/rbczmq/socket.c', line 1234

static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}

#reconnect_ivl=(200) ⇒ nil

Sets the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl = 200  =>  nil

Returns:

  • (nil)


1254
1255
1256
1257
1258
# File 'ext/rbczmq/socket.c', line 1254

static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}

#reconnect_ivl_maxFixnum

Returns the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max  =>  0

Returns:

  • (Fixnum)


1273
1274
1275
1276
1277
1278
# File 'ext/rbczmq/socket.c', line 1273

static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}

#reconnect_ivl_max=(5) ⇒ nil

Sets the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max = 5  =>  nil

Returns:

  • (nil)


1293
1294
1295
1296
1297
# File 'ext/rbczmq/socket.c', line 1293

static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}

#recovery_ivlFixnum

Returns the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl  =>  10

Returns:

  • (Fixnum)


961
962
963
964
965
966
# File 'ext/rbczmq/socket.c', line 961

static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl(sock->socket));
}

#recovery_ivl=(20) ⇒ nil

Sets the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl = 20  =>  nil

Returns:

  • (nil)


981
982
983
984
985
# File 'ext/rbczmq/socket.c', line 981

static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}

#recovery_ivl_msecFixnum

Returns the socket RECOVERY_IVL_MSEC value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl_msec  =>  -1

Returns:

  • (Fixnum)


1000
1001
1002
1003
1004
1005
# File 'ext/rbczmq/socket.c', line 1000

static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl_msec(sock->socket));
}

#recovery_ivl_msec=(20) ⇒ nil

Sets the socket RECOVERY_IVL_MSEC value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl_msec = 20  =>  nil

Returns:

  • (nil)


1020
1021
1022
1023
1024
# File 'ext/rbczmq/socket.c', line 1020

static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value);
}

#recvString?

Receive a string from this ZMQ socket. May block depending on the socket type.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv    =>  "message"

Returns:

  • (String, nil)


446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
# File 'ext/rbczmq/socket.c', line 446

static VALUE rb_czmq_socket_recv(VALUE obj)
{
    char *str = NULL;
    struct nogvl_recv_args args;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
    if (str == NULL) return result;
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
    result = ZmqEncode(rb_str_new2(str));
    free(str);
    return result;
}

#recv_frameZMQ::Frame?

Receives a ZMQ frame from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame  =>  ZMQ::Frame or nil

Returns:



681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
# File 'ext/rbczmq/socket.c', line 681

static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
    zframe_t *frame = NULL;
    struct nogvl_recv_args args;
    char print_prefix[255];
    char *cur_time = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_frame_nonblockZMQ::Frame?

Receives a ZMQ frame from this socket. Does not block

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame_nonblock  =>  ZMQ::Frame or nil

Returns:



715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
# File 'ext/rbczmq/socket.c', line 715

static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
    zframe_t *frame = NULL;
    char print_prefix[255];
    char *cur_time = NULL;
    errno = 0;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    frame = zframe_recv_nowait(sock->socket);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame_nonblock", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_messageZMQ::Message?

Receives a ZMQ message from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_message  =>  ZMQ::Message or nil

Returns:



776
777
778
779
780
781
782
783
784
785
786
787
788
789
# File 'ext/rbczmq/socket.c', line 776

static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
    zmsg_t *message = NULL;
    struct nogvl_recv_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
    if (message == NULL) return Qnil;
    if (sock->verbose) ZmqDumpMessage("recv_message", message);
    return rb_czmq_alloc_message(message);
}

#recv_nonblockString?

Receive a string from this ZMQ socket. Does not block.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_nonblock    =>  "message"

Returns:

  • (String, nil)


480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
# File 'ext/rbczmq/socket.c', line 480

static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
    char *str = NULL;
    errno = 0;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    str = zstr_recv_nowait(sock->socket);
    if (str == NULL) return result;
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
    result = ZmqEncode(rb_str_new2(str));
    free(str);
    return result;
}

#send("message") ⇒ Boolean

Sends a string to this ZMQ socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.send("message")    =>  true

Returns:

  • (Boolean)


351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'ext/rbczmq/socket.c', line 351

static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    Check_Type(msg, T_STRING);
    args.socket = sock;
    args.msg = StringValueCStr(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
    return Qtrue;
}

#send_frame(frame) ⇒ nil

Sends a ZMQ::Frame instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
frame = ZMQ::Frame("frame")
sock.send_frame(frame)    =>  nil
frame = ZMQ::Frame("multi")
sock.send_frame(frame, ZMQ::Frame::MORE)

Returns:

  • (nil)


540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'ext/rbczmq/socket.c', line 540

static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
    struct nogvl_send_frame_args args;
    VALUE frame_obj;
    VALUE flags;
    char print_prefix[255];
    char *cur_time = NULL;
    zframe_t *print_frame = NULL;
    int rc, flgs;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "11", &frame_obj, &flags);
    ZmqGetFrame(frame_obj);

    if (NIL_P(flags)) {
        flgs = 0;
    } else {
        if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
        Check_Type(flags, T_FIXNUM);
        flgs = FIX2INT(flags);
    }

    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
    }
    args.socket = sock;
    args.frame = frame;
    args.flags = flgs;
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
    return Qtrue;
}

#send_message(msg) ⇒ nil

Sends a ZMQ::Message instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
msg = ZMQ::Message.new
msg.push ZMQ::Frame("header")
sock.send_message(msg)   =>  nil

Returns:

  • (nil)


621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
# File 'ext/rbczmq/socket.c', line 621

static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
    struct nogvl_send_message_args args;
    zmsg_t *print_message = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    ZmqGetMessage(message_obj);
    if (sock->verbose) print_message = zmsg_dup(message->message);
    args.socket = sock;
    args.message = message->message;
    rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
    message->flags |= ZMQ_MESSAGE_DESTROYED;
    if (sock->verbose) ZmqDumpMessage("send_message", print_message);
    return Qnil;
}

#sendm("message") ⇒ Boolean

Sends a string to this ZMQ socket, with a more flag set.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.sendm("mes")    =>  true
sock.sendm("sage")    =>  true

Returns:

  • (Boolean)


384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'ext/rbczmq/socket.c', line 384

static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    Check_Type(msg, T_STRING);
    args.socket = sock;
    args.msg = StringValueCStr(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
    return Qtrue;
}

#sndbufFixnum

Returns the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf  =>  0

Returns:

  • (Fixnum)


1078
1079
1080
1081
1082
1083
# File 'ext/rbczmq/socket.c', line 1078

static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndbuf(sock->socket));
}

#sndbuf=(1000) ⇒ nil

Sets the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf = 1000  =>  nil

Returns:

  • (nil)


1098
1099
1100
1101
1102
# File 'ext/rbczmq/socket.c', line 1098

static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}

#sndtimeoFixnum

Returns the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo  =>  -1

Returns:

  • (Fixnum)


1462
1463
1464
1465
1466
1467
# File 'ext/rbczmq/socket.c', line 1462

static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndtimeo(sock->socket));
}

#sndtimeout=(200) ⇒ nil

Sets the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo = 200  =>  nil

Returns:

  • (nil)


1482
1483
1484
1485
1486
# File 'ext/rbczmq/socket.c', line 1482

static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}

#stateString

Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.state       =>   ZMQ::Socket::PENDING
sock.bind("inproc://test")
sock.state       =>   ZMQ::Socket::BOUND

Returns:

  • (String)


134
135
136
137
138
139
# File 'ext/rbczmq/socket.c', line 134

static VALUE rb_czmq_socket_state(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(sock->state);
}

#subscribe(value) ⇒ Object

Subscribes this SUB socket to a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.subscribe "ruby"  =>  nil


1341
1342
1343
1344
1345
1346
1347
# File 'ext/rbczmq/socket.c', line 1341

static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#swapFixnum

Returns the socket SWAP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.swap  =>  0

Returns:

  • (Fixnum)


844
845
846
847
848
849
# File 'ext/rbczmq/socket.c', line 844

static VALUE rb_czmq_socket_opt_swap(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_swap(sock->socket));
}

#swap=(100) ⇒ nil

Sets the socket SWAP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.swap = 100  =>  nil

Returns:

  • (nil)


864
865
866
867
868
# File 'ext/rbczmq/socket.c', line 864

static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_swap, "SWAP", value);
}

#to_sObject

Generates a string representation of the current socket state

socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”



59
60
61
62
63
64
65
66
67
68
# File 'lib/zmq/socket.rb', line 59

def to_s
  case state
  when BOUND
    "#{type_str} socket bound to #{endpoint}"
  when CONNECTED
    "#{type_str} socket connected to #{endpoint}"
  else
    "#{type_str} socket"
  end
end

#type_strObject

Generates a string representation of this socket type

socket = ctx.socket(:PUB) socket.type_str => “PUB”



50
51
52
# File 'lib/zmq/socket.rb', line 50

def type_str
  self.class.const_get(:TYPE_STR)
end

#unsubscribe(value) ⇒ Object

Unsubscribes this SUB socket from a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.unsubscribe "ruby"  =>  nil


1362
1363
1364
1365
1366
1367
1368
# File 'ext/rbczmq/socket.c', line 1362

static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#verbose=(true) ⇒ nil

Let this socket be verbose - dumps a lot of data to stdout for debugging.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.verbose = true    =>  nil

Returns:

  • (nil)


275
276
277
278
279
280
281
282
283
# File 'ext/rbczmq/socket.c', line 275

static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
    Bool vlevel;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    vlevel = (level == Qtrue) ? TRUE : FALSE;
    sock->verbose = vlevel;
    return Qnil;
}

#writable?Boolean

Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.writable? => true

Returns:

  • (Boolean)


41
42
43
# File 'lib/zmq/socket.rb', line 41

def writable?
  (events & ZMQ::POLLOUT) == ZMQ::POLLOUT
end