Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/socket.rb,
ext/rbczmq/socket.c

Direct Known Subclasses

Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub

Defined Under Namespace

Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub

Constant Summary collapse

PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.handle_fsm_errors(error, *methods) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/zmq/socket.rb', line 12

def self.handle_fsm_errors(error, *methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args);
        super
      rescue SystemCallError => e
        raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM
        raise
      end
    evl
  end
end

.unsupported_api(*methods) ⇒ Object



4
5
6
7
8
9
10
# File 'lib/zmq/socket.rb', line 4

def self.unsupported_api(*methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!");  end
    evl
  end
end

Instance Method Details

#affinityFixnum

Returns the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity  =>  0

Returns:

  • (Fixnum)


973
974
975
976
977
978
# File 'ext/rbczmq/socket.c', line 973

static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_affinity(sock->socket));
}

#affinity=(1) ⇒ nil

Sets the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity = 1  =>  nil

Returns:

  • (nil)


993
994
995
996
997
# File 'ext/rbczmq/socket.c', line 993

static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}

#backlogFixnum

Returns the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog  =>  100

Returns:

  • (Fixnum)


1285
1286
1287
1288
1289
1290
# File 'ext/rbczmq/socket.c', line 1285

static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_backlog(sock->socket));
}

#backlog=(200) ⇒ nil

Sets the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog = 200  =>  nil

Returns:

  • (nil)


1305
1306
1307
1308
1309
# File 'ext/rbczmq/socket.c', line 1305

static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}

#bind("inproc: //test") ⇒ Fixnum

Binds to a given endpoint. When the port number is ‘*’, attempts to bind to a free port. Always returns the port number on success.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.bind("tcp://localhost:*")    =>  5432

Returns:

  • (Fixnum)


219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'ext/rbczmq/socket.c', line 219

static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
    if (rc == -1) ZmqRaiseSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_BOUND;
    sock->endpoint = rb_str_new4(endpoint);
    return INT2NUM(rc);
}

#closenil

Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

sock.close    =>  nil

Returns:

  • (nil)


87
88
89
90
91
92
93
94
95
96
97
# File 'ext/rbczmq/socket.c', line 87

static VALUE rb_czmq_socket_close(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    /* This is useless for production / real use cases as we can't query the state again OR assume
    anything about the underlying connection. Merely doing the right thing. */
    sock->state = ZMQ_SOCKET_PENDING;
    rb_czmq_free_sock(sock);
    return Qnil;
}

#connect("tcp: //localhost:3456") ⇒ Boolean

Attempts to connect to a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'ext/rbczmq/socket.c', line 253

static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_CONNECTED;
    sock->endpoint = rb_str_new4(endpoint);
    return Qtrue;
}

#endpointString?

Returns the endpoint this socket is currently connected to, if any.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoint    =>   nil
sock.bind("inproc://test")
sock.endpoint    =>  "inproc://test"

Returns:

  • (String, nil)


114
115
116
117
118
119
# File 'ext/rbczmq/socket.c', line 114

static VALUE rb_czmq_socket_endpoint(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return sock->endpoint;
}

#eventsFixnum

Query if this socket is in a readable or writable state.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.events => ZMQ::POLLIN

Returns:

  • (Fixnum)


1493
1494
1495
1496
1497
1498
# File 'ext/rbczmq/socket.c', line 1493

static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_events(sock->socket));
}

#fdFixnum Also known as: to_i

Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor. Edge-triggered notification of I/O state changes.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.fd       =>   -1
sock.bind("inproc://test")
sock.fd       =>   4

Returns:

  • (Fixnum)


159
160
161
162
163
164
165
# File 'ext/rbczmq/socket.c', line 159

static VALUE rb_czmq_socket_fd(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1);
    return INT2NUM(zsocket_fd(sock->socket));
}

#hwmFixnum

Returns the socket HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.hwm  =>  0

Returns:

  • (Fixnum)


894
895
896
897
898
899
# File 'ext/rbczmq/socket.c', line 894

static VALUE rb_czmq_socket_opt_hwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_hwm(sock->socket));
}

#hwm=(100) ⇒ nil

Sets the socket HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.hwm = 100  =>  nil
sock.hwm  =>  100

Returns:

  • (nil)


915
916
917
918
919
# File 'ext/rbczmq/socket.c', line 915

static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_hwm, "HWM", value);
}

#identity=(value) ⇒ Object

Sets the socket IDENTITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.identity = "anonymous"  =>  nil


1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
# File 'ext/rbczmq/socket.c', line 1402

static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
    char *val;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(value, T_STRING);
    if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
    if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
    val = StringValueCStr(value);
    zsocket_set_identity(sock->socket, val);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
    return Qnil;
}

#lingerFixnum

Returns the socket LINGER value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger  =>  -1

Returns:

  • (Fixnum)


1246
1247
1248
1249
1250
1251
# File 'ext/rbczmq/socket.c', line 1246

static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_linger(sock->socket));
}

#linger=(1000) ⇒ nil

Sets the socket LINGER value in ms.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger = 1000  =>  nil

Returns:

  • (nil)


1266
1267
1268
1269
1270
# File 'ext/rbczmq/socket.c', line 1266

static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}

#mcast_loop=(false) ⇒ nil

Sets the socket MCAST_LOOP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.mcast_loop = false  =>  nil

Returns:

  • (nil)


1149
1150
1151
1152
1153
# File 'ext/rbczmq/socket.c', line 1149

static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_mcast_loop, "MCAST_LOOP", value);
}

#mcast_loop?Boolean

Returns the socket MCAST_LOOP status.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.mcast_loop?  =>  true

Returns:

  • (Boolean)


1129
1130
1131
1132
1133
1134
# File 'ext/rbczmq/socket.c', line 1129

static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse;
}

#poll_readable?Boolean

Poll all sockets for readbable states by default

Returns:

  • (Boolean)


69
70
71
# File 'lib/zmq/socket.rb', line 69

def poll_readable?
  true
end

#poll_writable?Boolean

Poll all sockets for writable states by default

Returns:

  • (Boolean)


74
75
76
# File 'lib/zmq/socket.rb', line 74

def poll_writable?
  true
end

#rateFixnum

Returns the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate  =>  40000

Returns:

  • (Fixnum)


1012
1013
1014
1015
1016
1017
# File 'ext/rbczmq/socket.c', line 1012

static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rate(sock->socket));
}

#rate=(50000) ⇒ nil

Sets the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate = 50000  =>  nil

Returns:

  • (nil)


1032
1033
1034
1035
1036
# File 'ext/rbczmq/socket.c', line 1032

static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}

#rcvbufFixnum

Returns the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf  =>  0

Returns:

  • (Fixnum)


1207
1208
1209
1210
1211
1212
# File 'ext/rbczmq/socket.c', line 1207

static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvbuf(sock->socket));
}

#rcvbuf=(1000) ⇒ nil

Sets the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf = 1000  =>  nil

Returns:

  • (nil)


1227
1228
1229
1230
1231
# File 'ext/rbczmq/socket.c', line 1227

static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}

#rcvmoreBoolean

Query if there’s more messages to receive.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.rcvmore =>  true

Returns:

  • (Boolean)


1473
1474
1475
1476
1477
1478
# File 'ext/rbczmq/socket.c', line 1473

static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}

#rcvtimeoFixnum

Returns the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo  =>  -1

Returns:

  • (Fixnum)


1513
1514
1515
1516
1517
1518
# File 'ext/rbczmq/socket.c', line 1513

static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvtimeo(sock->socket));
}

#rcvtimeout=(200) ⇒ nil

Sets the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo = 200  =>  nil

Returns:

  • (nil)


1533
1534
1535
1536
1537
# File 'ext/rbczmq/socket.c', line 1533

static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}

#readable?Boolean

Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.readable? => true

Returns:

  • (Boolean)


30
31
32
# File 'lib/zmq/socket.rb', line 30

def readable?
  (events & ZMQ::POLLIN) == ZMQ::POLLIN
end

#reconnect_ivlFixnum

Returns the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl  =>  100

Returns:

  • (Fixnum)


1324
1325
1326
1327
1328
1329
# File 'ext/rbczmq/socket.c', line 1324

static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}

#reconnect_ivl=(200) ⇒ nil

Sets the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl = 200  =>  nil

Returns:

  • (nil)


1344
1345
1346
1347
1348
# File 'ext/rbczmq/socket.c', line 1344

static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}

#reconnect_ivl_maxFixnum

Returns the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max  =>  0

Returns:

  • (Fixnum)


1363
1364
1365
1366
1367
1368
# File 'ext/rbczmq/socket.c', line 1363

static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}

#reconnect_ivl_max=(5) ⇒ nil

Sets the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max = 5  =>  nil

Returns:

  • (nil)


1383
1384
1385
1386
1387
# File 'ext/rbczmq/socket.c', line 1383

static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}

#recovery_ivlFixnum

Returns the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl  =>  10

Returns:

  • (Fixnum)


1051
1052
1053
1054
1055
1056
# File 'ext/rbczmq/socket.c', line 1051

static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl(sock->socket));
}

#recovery_ivl=(20) ⇒ nil

Sets the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl = 20  =>  nil

Returns:

  • (nil)


1071
1072
1073
1074
1075
# File 'ext/rbczmq/socket.c', line 1071

static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}

#recovery_ivl_msecFixnum

Returns the socket RECOVERY_IVL_MSEC value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl_msec  =>  -1

Returns:

  • (Fixnum)


1090
1091
1092
1093
1094
1095
# File 'ext/rbczmq/socket.c', line 1090

static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl_msec(sock->socket));
}

#recovery_ivl_msec=(20) ⇒ nil

Sets the socket RECOVERY_IVL_MSEC value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl_msec = 20  =>  nil

Returns:

  • (nil)


1110
1111
1112
1113
1114
# File 'ext/rbczmq/socket.c', line 1110

static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value);
}

#recvString?

Receive a string from this ZMQ socket. May block depending on the socket type.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv    =>  "message"

Returns:

  • (String, nil)


450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
# File 'ext/rbczmq/socket.c', line 450

static VALUE rb_czmq_socket_recv(VALUE obj)
{
    char *str = NULL;
    struct nogvl_recv_args args;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
    if (str == NULL) return result;
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
    result = ZmqEncode(rb_str_new2(str));
    free(str);
    return result;
}

#recv_frameZMQ::Frame?

Receives a ZMQ frame from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame  =>  ZMQ::Frame or nil

Returns:



685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
# File 'ext/rbczmq/socket.c', line 685

static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
    zframe_t *frame = NULL;
    struct nogvl_recv_args args;
    char print_prefix[255];
    char *cur_time = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_frame_nonblockZMQ::Frame?

Receives a ZMQ frame from this socket. Does not block

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame_nonblock  =>  ZMQ::Frame or nil

Returns:



719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
# File 'ext/rbczmq/socket.c', line 719

static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
    zframe_t *frame = NULL;
    char print_prefix[255];
    char *cur_time = NULL;
    errno = 0;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    frame = zframe_recv_nowait(sock->socket);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame_nonblock", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_messageZMQ::Message?

Receives a ZMQ message from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_message  =>  ZMQ::Message or nil

Returns:



780
781
782
783
784
785
786
787
788
789
790
791
792
793
# File 'ext/rbczmq/socket.c', line 780

static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
    zmsg_t *message = NULL;
    struct nogvl_recv_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
    if (message == NULL) return Qnil;
    if (sock->verbose) ZmqDumpMessage("recv_message", message);
    return rb_czmq_alloc_message(message);
}

#recv_nonblockString?

Receive a string from this ZMQ socket. Does not block.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_nonblock    =>  "message"

Returns:

  • (String, nil)


484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# File 'ext/rbczmq/socket.c', line 484

static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
    char *str = NULL;
    errno = 0;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    str = zstr_recv_nowait(sock->socket);
    if (str == NULL) return result;
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
    result = ZmqEncode(rb_str_new2(str));
    free(str);
    return result;
}

#recv_timeoutFixnum?

Returns the recv timeout currently associated with this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recv_timeout = 5
sock.recv_timeout   =>   5

Returns:

  • (Fixnum, nil)


831
832
833
834
835
836
# File 'ext/rbczmq/socket.c', line 831

static VALUE rb_czmq_socket_recv_timeout(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return sock->recv_timeout;
}

#recv_timeout=(5) ⇒ nil

Sets a receive timeout for this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recv_timeout = 5 => nil

Returns:

  • (nil)


808
809
810
811
812
813
814
815
# File 'ext/rbczmq/socket.c', line 808

static VALUE rb_czmq_socket_set_recv_timeout(VALUE obj, VALUE timeout)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout)));
    sock->recv_timeout = timeout;
    return Qnil;
}

#send("message") ⇒ Boolean

Sends a string to this ZMQ socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.send("message")    =>  true

Returns:

  • (Boolean)


357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'ext/rbczmq/socket.c', line 357

static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    Check_Type(msg, T_STRING);
    args.socket = sock;
    args.msg = StringValueCStr(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
    return Qtrue;
}

#send_frame(frame) ⇒ nil

Sends a ZMQ::Frame instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
frame = ZMQ::Frame("frame")
sock.send_frame(frame)    =>  nil
frame = ZMQ::Frame("multi")
sock.send_frame(frame, ZMQ::Frame::MORE)

Returns:

  • (nil)


544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
# File 'ext/rbczmq/socket.c', line 544

static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
    struct nogvl_send_frame_args args;
    VALUE frame_obj;
    VALUE flags;
    char print_prefix[255];
    char *cur_time = NULL;
    zframe_t *print_frame = NULL;
    int rc, flgs;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "11", &frame_obj, &flags);
    ZmqGetFrame(frame_obj);

    if (NIL_P(flags)) {
        flgs = 0;
    } else {
        if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
        Check_Type(flags, T_FIXNUM);
        flgs = FIX2INT(flags);
    }

    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
    }
    args.socket = sock;
    args.frame = frame;
    args.flags = flgs;
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
    return Qtrue;
}

#send_message(msg) ⇒ nil

Sends a ZMQ::Message instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
msg = ZMQ::Message.new
msg.push ZMQ::Frame("header")
sock.send_message(msg)   =>  nil

Returns:

  • (nil)


625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
# File 'ext/rbczmq/socket.c', line 625

static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
    struct nogvl_send_message_args args;
    zmsg_t *print_message = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    ZmqGetMessage(message_obj);
    if (sock->verbose) print_message = zmsg_dup(message->message);
    args.socket = sock;
    args.message = message->message;
    rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
    message->flags |= ZMQ_MESSAGE_DESTROYED;
    if (sock->verbose) ZmqDumpMessage("send_message", print_message);
    return Qnil;
}

#send_timeoutFixnum?

Returns the send timeout currently associated with this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.send_timeout = 5
sock.send_timeout   =>   5

Returns:

  • (Fixnum, nil)


874
875
876
877
878
879
# File 'ext/rbczmq/socket.c', line 874

static VALUE rb_czmq_socket_send_timeout(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return sock->send_timeout;
}

#send_timeout=(5) ⇒ nil

Sets a send timeout for this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.send_timeout = 5 => nil

Returns:

  • (nil)


851
852
853
854
855
856
857
858
# File 'ext/rbczmq/socket.c', line 851

static VALUE rb_czmq_socket_set_send_timeout(VALUE obj, VALUE timeout)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    if (TYPE(timeout) != T_FIXNUM && TYPE(timeout) != T_FLOAT) rb_raise(rb_eTypeError, "wrong timeout type %s (expected Fixnum or Float)", RSTRING_PTR(rb_obj_as_string(timeout)));
    sock->send_timeout = timeout;
    return Qnil;
}

#sendm("message") ⇒ Boolean

Sends a string to this ZMQ socket, with a more flag set.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.sendm("mes")    =>  true
sock.sendm("sage")    =>  true

Returns:

  • (Boolean)


390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'ext/rbczmq/socket.c', line 390

static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    Check_Type(msg, T_STRING);
    args.socket = sock;
    args.msg = StringValueCStr(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
    return Qtrue;
}

#sndbufFixnum

Returns the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf  =>  0

Returns:

  • (Fixnum)


1168
1169
1170
1171
1172
1173
# File 'ext/rbczmq/socket.c', line 1168

static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndbuf(sock->socket));
}

#sndbuf=(1000) ⇒ nil

Sets the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf = 1000  =>  nil

Returns:

  • (nil)


1188
1189
1190
1191
1192
# File 'ext/rbczmq/socket.c', line 1188

static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}

#sndtimeoFixnum

Returns the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo  =>  -1

Returns:

  • (Fixnum)


1552
1553
1554
1555
1556
1557
# File 'ext/rbczmq/socket.c', line 1552

static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndtimeo(sock->socket));
}

#sndtimeout=(200) ⇒ nil

Sets the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo = 200  =>  nil

Returns:

  • (nil)


1572
1573
1574
1575
1576
# File 'ext/rbczmq/socket.c', line 1572

static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}

#stateString

Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.state       =>   ZMQ::Socket::PENDING
sock.bind("inproc://test")
sock.state       =>   ZMQ::Socket::BOUND

Returns:

  • (String)


136
137
138
139
140
141
# File 'ext/rbczmq/socket.c', line 136

static VALUE rb_czmq_socket_state(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(sock->state);
}

#subscribe(value) ⇒ Object

Subscribes this SUB socket to a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.subscribe "ruby"  =>  nil


1431
1432
1433
1434
1435
1436
1437
# File 'ext/rbczmq/socket.c', line 1431

static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#swapFixnum

Returns the socket SWAP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.swap  =>  0

Returns:

  • (Fixnum)


934
935
936
937
938
939
# File 'ext/rbczmq/socket.c', line 934

static VALUE rb_czmq_socket_opt_swap(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_swap(sock->socket));
}

#swap=(100) ⇒ nil

Sets the socket SWAP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.swap = 100  =>  nil

Returns:

  • (nil)


954
955
956
957
958
# File 'ext/rbczmq/socket.c', line 954

static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_swap, "SWAP", value);
}

#to_sObject

Generates a string representation of the current socket state

socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”



57
58
59
60
61
62
63
64
65
66
# File 'lib/zmq/socket.rb', line 57

def to_s
  case state
  when BOUND
    "#{type_str} socket bound to #{endpoint}"
  when CONNECTED
    "#{type_str} socket connected to #{endpoint}"
  else
    "#{type_str} socket"
  end
end

#type_strObject

Generates a string representation of this socket type

socket = ctx.socket(:PUB) socket.type_str => “PUB”



48
49
50
# File 'lib/zmq/socket.rb', line 48

def type_str
  self.class.const_get(:TYPE_STR)
end

#unsubscribe(value) ⇒ Object

Unsubscribes this SUB socket from a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.unsubscribe "ruby"  =>  nil


1452
1453
1454
1455
1456
1457
1458
# File 'ext/rbczmq/socket.c', line 1452

static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#verbose=(true) ⇒ nil

Let this socket be verbose - dumps a lot of data to stdout for debugging.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.verbose = true    =>  nil

Returns:

  • (nil)


285
286
287
288
289
290
291
292
293
# File 'ext/rbczmq/socket.c', line 285

static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
    Bool vlevel;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    vlevel = (level == Qtrue) ? TRUE : FALSE;
    sock->verbose = vlevel;
    return Qnil;
}

#writable?Boolean

Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.writable? => true

Returns:

  • (Boolean)


39
40
41
# File 'lib/zmq/socket.rb', line 39

def writable?
  (events & ZMQ::POLLOUT) == ZMQ::POLLOUT
end