Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/socket.rb,
ext/rbczmq/socket.c

Direct Known Subclasses

Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Stream, Sub, XPub, XSub

Defined Under Namespace

Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Stream, Sub, XPub, XSub

Constant Summary collapse

PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
DISCONNECTED =
INT2NUM(ZMQ_SOCKET_DISCONNECTED)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.handle_fsm_errors(error, *methods) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/zmq/socket.rb', line 14

def self.handle_fsm_errors(error, *methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args);
        super
      rescue SystemCallError => e
        raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM
        raise
      end
    evl
  end
end

.unsupported_api(*methods) ⇒ Object



6
7
8
9
10
11
12
# File 'lib/zmq/socket.rb', line 6

def self.unsupported_api(*methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!");  end
    evl
  end
end

Instance Method Details

#affinityFixnum

Returns the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity  =>  0

Returns:

  • (Fixnum)


994
995
996
997
998
999
# File 'ext/rbczmq/socket.c', line 994

static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_affinity(sock->socket));
}

#affinity=(1) ⇒ nil

Sets the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity = 1  =>  nil

Returns:

  • (nil)


1014
1015
1016
1017
1018
# File 'ext/rbczmq/socket.c', line 1014

static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}

#backlogFixnum

Returns the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog  =>  100

Returns:

  • (Fixnum)


1425
1426
1427
1428
1429
1430
# File 'ext/rbczmq/socket.c', line 1425

static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_backlog(sock->socket));
}

#backlog=(200) ⇒ nil

Sets the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog = 200  =>  nil

Returns:

  • (nil)


1445
1446
1447
1448
1449
# File 'ext/rbczmq/socket.c', line 1445

static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}

#bind(uri) ⇒ Object

Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”

socket.bind “collector.domain.com” # resolves 10.0.0.2:9000



99
100
101
102
# File 'lib/zmq/socket.rb', line 99

def bind(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_bind(uri)
end

#closenil

Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

sock.close    =>  nil

Returns:

  • (nil)


80
81
82
83
84
85
86
87
88
89
90
# File 'ext/rbczmq/socket.c', line 80

static VALUE rb_czmq_socket_close(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    /* This is useless for production / real use cases as we can't query the state again OR assume
    anything about the underlying connection. Merely doing the right thing. */
    sock->state = ZMQ_SOCKET_DISCONNECTED;
    rb_czmq_context_destroy_socket(sock);
    return Qnil;
}

#connect(uri) ⇒ Object

Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”

socket.connect “collector.domain.com” # resolves 10.0.0.2:9000



111
112
113
114
# File 'lib/zmq/socket.rb', line 111

def connect(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_connect(uri)
end

#connect_all(uri) ⇒ Object

Connects to all endpoints that are returned from a SRV record lookup.

socket = ctx.socket(:PUB)

socket.connect “collector.domain.com” # resolves 10.0.0.2:9000 10.0.0.3:9000



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/zmq/socket.rb', line 122

def connect_all(uri)
  if uri =~ PROTO_REXP
    real_connect(uri)
    return
  end

  addresses = resolve_all(uri)
  if addresses.empty?
    real_connect(uri)
  else
    addresses.each do |address|
      host = Resolv.getaddress(address.target.to_s)
      real_connect("tcp://#{host}:#{address.port}")
    end
  end
  self
end

#delay_attach_on_connect=(true) ⇒ nil

Sets the socket DELAY_ATTACH_ON_CONNECT value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.delay_attach_on_connect = true  =>  nil

Returns:

  • (nil)


1230
1231
1232
1233
1234
# File 'ext/rbczmq/socket.c', line 1230

static VALUE rb_czmq_socket_set_opt_delay_attach_on_connect(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_delay_attach_on_connect, "DELAY_ATTACH_ON_CONNECT", value);
}

#disconnect("tcp: //localhost:3456") ⇒ Boolean

Attempts to disconnect from a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true
req.disconnect("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'ext/rbczmq/socket.c', line 299

static VALUE rb_czmq_socket_disconnect(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_disconnect, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: disconnected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    rb_ary_delete(sock->endpoints, endpoint);
    long endpoint_count = RARRAY_LEN(sock->endpoints);
    if (endpoint_count == 0) {
        sock->state = ZMQ_SOCKET_DISCONNECTED;
    }
    return Qtrue;
}

#endpointObject

Returns the endpoint this socket is currently connected to, if any.

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoint    =>   nil
sock.bind("inproc://test")
sock.endpoint    =>  "inproc://test"


35
36
37
# File 'lib/zmq/socket.rb', line 35

def endpoint
  endpoints.first
end

#endpointsArray of Strings

Returns the endpoints this socket is currently connected to, if any.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoints    =>   []
sock.bind("inproc://test")
sock.endpoints    =>  ["inproc://test"]

Returns:

  • (Array of Strings)


107
108
109
110
111
112
# File 'ext/rbczmq/socket.c', line 107

static VALUE rb_czmq_socket_endpoints(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return sock->endpoints;
}

#eventsFixnum

Query if this socket is in a readable or writable state.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.events => ZMQ::POLLIN

Returns:

  • (Fixnum)


1633
1634
1635
1636
1637
1638
# File 'ext/rbczmq/socket.c', line 1633

static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_events(sock->socket));
}

#fdFixnum Also known as: to_i

Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor. Edge-triggered notification of I/O state changes.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.fd       =>   -1
sock.bind("inproc://test")
sock.fd       =>   4

Returns:

  • (Fixnum)


152
153
154
155
156
157
158
# File 'ext/rbczmq/socket.c', line 152

static VALUE rb_czmq_socket_fd(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    if (sock->state == ZMQ_SOCKET_PENDING || sock->state == ZMQ_SOCKET_DISCONNECTED) return INT2NUM(-1);
    return INT2NUM(zsocket_fd(sock->socket));
}

#identity=(value) ⇒ Object

Sets the socket IDENTITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.identity = "anonymous"  =>  nil


1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
# File 'ext/rbczmq/socket.c', line 1542

static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
    char *val;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(value, T_STRING);
    if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
    if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
    val = StringValueCStr(value);
    zsocket_set_identity(sock->socket, val);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
    return Qnil;
}

#ipv4only=(true) ⇒ nil

Sets the socket IPV4ONLY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.ipv4only = true  =>  nil

Returns:

  • (nil)


1211
1212
1213
1214
1215
# File 'ext/rbczmq/socket.c', line 1211

static VALUE rb_czmq_socket_set_opt_ipv4only(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_ipv4only, "IPV4ONLY", value);
}

#ipv4onlyBoolean

Returns the socket IPV4ONLY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.ipv4only  =>  false

Returns:

  • (Boolean)


1189
1190
1191
1192
1193
1194
1195
1196
# File 'ext/rbczmq/socket.c', line 1189

static VALUE rb_czmq_socket_opt_ipv4only(VALUE obj)
{
    int ipv4only;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ipv4only = zsocket_ipv4only(sock->socket);
    return (ipv4only == 0 ? Qfalse : Qtrue);
}

#port=(sock) ⇒ 41415 #last_endpointObject

Gets the last endpoint that this socket connected or bound to.

Overloads:

  • #port=(sock) ⇒ 41415

    Returns:

    • (41415)


1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
# File 'ext/rbczmq/socket.c', line 1728

static VALUE rb_czmq_socket_opt_last_endpoint(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    char* endpoint_string = zsocket_last_endpoint(sock->socket);
    VALUE result = rb_str_new_cstr(endpoint_string);
    if (endpoint_string != NULL) {
        free(endpoint_string);
    }
    return result;
}

#lingerFixnum

Returns the socket LINGER value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger  =>  -1

Returns:

  • (Fixnum)


1384
1385
1386
1387
1388
1389
1390
1391
# File 'ext/rbczmq/socket.c', line 1384

static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    // return INT2NUM(zsocket_linger(sock->socket));
    // TODO: how to get the linger value in ZMQ4/CZMQ2?
    return INT2NUM(-1);
}

#linger=(1000) ⇒ nil

Sets the socket LINGER value in ms.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger = 1000  =>  nil

Returns:

  • (nil)


1406
1407
1408
1409
1410
# File 'ext/rbczmq/socket.c', line 1406

static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}

#maxmsgsizeFixnum

Returns the socket MAXMSGSIZE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.maxmsgsize  =>  10

Returns:

  • (Fixnum)


1111
1112
1113
1114
1115
1116
# File 'ext/rbczmq/socket.c', line 1111

static VALUE rb_czmq_socket_opt_maxmsgsize(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_maxmsgsize(sock->socket));
}

#maxmsgsize=(20) ⇒ nil

Sets the socket MAXMSGSIZE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.maxmsgsize = 20  =>  nil

Returns:

  • (nil)


1131
1132
1133
1134
1135
# File 'ext/rbczmq/socket.c', line 1131

static VALUE rb_czmq_socket_set_opt_maxmsgsize(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_maxmsgsize, "MAXMSGSIZE", value);
}

#monitor("inproc: //monitoring", callback, events) ⇒ nil

Registers a monitoring callback for this socket

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
rep.monitor("inproc://monitoring.rep", RepMonitor)
req = ctx.socket(:REQ)
req.monitor("inproc://monitoring.req", ReqMonitor, ZMQ_EVENT_DISCONNECTED)
rep.bind("tcp://127.0.0.1:5331")
rep.bind("tcp://127.0.0.1:5332")

Returns:

  • (nil)


1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
# File 'ext/rbczmq/socket.c', line 1853

static VALUE rb_czmq_socket_monitor(int argc, VALUE *argv, VALUE obj)
{
    VALUE endpoint;
    VALUE handler;
    VALUE events;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "12", &endpoint, &handler, &events);
    Check_Type(endpoint, T_STRING);
    if (NIL_P(events))
        events = rb_const_get_at(rb_mZmq, rb_intern("EVENT_ALL"));
    if (NIL_P(handler)) {
        handler = rb_class_new_instance(0, NULL, rb_const_get_at(rb_mZmq, rb_intern("Monitor")));
    }
    Check_Type(events, T_FIXNUM);
    rc = zmq_socket_monitor(sock->socket, StringValueCStr(endpoint), NUM2INT(events));
    if (rc == 0) {
        sock->monitor_endpoint = endpoint;
        sock->monitor_handler = handler;
        sock->monitor_thread = rb_thread_create(rb_czmq_socket_monitor_thread, (void*)sock);
        return Qtrue;
    } else {
        return Qfalse;
    }
}

#multicast_hopsFixnum

Returns the socket MULTICAST_HOPS value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.multicast_hops  =>  10

Returns:

  • (Fixnum)


1150
1151
1152
1153
1154
1155
# File 'ext/rbczmq/socket.c', line 1150

static VALUE rb_czmq_socket_opt_multicast_hops(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_multicast_hops(sock->socket));
}

#multicast_hops=(20) ⇒ nil

Sets the socket MULTICAST_HOPS value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.multicast_hops = 20  =>  nil

Returns:

  • (nil)


1170
1171
1172
1173
1174
# File 'ext/rbczmq/socket.c', line 1170

static VALUE rb_czmq_socket_set_opt_multicast_hops(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_multicast_hops, "MULTICAST_HOPS", value);
}

#poll(100) ⇒ Boolean

Poll for input events on the socket. Returns true if there is input, otherwise false.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.poll(100)  =>  true

Returns:

  • (Boolean)


887
888
889
890
891
892
893
894
895
896
897
898
899
# File 'ext/rbczmq/socket.c', line 887

static VALUE rb_czmq_socket_poll(VALUE obj, VALUE timeout)
{
    bool readable;
    struct nogvl_socket_poll_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    Check_Type(timeout, T_FIXNUM);
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    args.timeout = FIX2INT(timeout);
    readable = (bool)rb_thread_call_without_gvl(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
    return (readable == true) ? Qtrue : Qfalse;
}

#poll_readable?Boolean

Poll all sockets for readbable states by default

Returns:

  • (Boolean)


83
84
85
# File 'lib/zmq/socket.rb', line 83

def poll_readable?
  true
end

#poll_writable?Boolean

Poll all sockets for writable states by default

Returns:

  • (Boolean)


88
89
90
# File 'lib/zmq/socket.rb', line 88

def poll_writable?
  true
end

#rateFixnum

Returns the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate  =>  40000

Returns:

  • (Fixnum)


1033
1034
1035
1036
1037
1038
# File 'ext/rbczmq/socket.c', line 1033

static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rate(sock->socket));
}

#rate=(50000) ⇒ nil

Sets the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate = 50000  =>  nil

Returns:

  • (nil)


1053
1054
1055
1056
1057
# File 'ext/rbczmq/socket.c', line 1053

static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}

#rcvbufFixnum

Returns the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf  =>  0

Returns:

  • (Fixnum)


1345
1346
1347
1348
1349
1350
# File 'ext/rbczmq/socket.c', line 1345

static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvbuf(sock->socket));
}

#rcvbuf=(1000) ⇒ nil

Sets the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf = 1000  =>  nil

Returns:

  • (nil)


1365
1366
1367
1368
1369
# File 'ext/rbczmq/socket.c', line 1365

static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}

#rcvhwmFixnum

Returns the socket receive HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm  =>  0

Returns:

  • (Fixnum)


954
955
956
957
958
959
# File 'ext/rbczmq/socket.c', line 954

static VALUE rb_czmq_socket_opt_rcvhwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvhwm(sock->socket));
}

#rcvhwm=(100) ⇒ nil

Sets the socket receive HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvhwm = 100  =>  nil
sock.rcvhwm  =>  100

Returns:

  • (nil)


975
976
977
978
979
# File 'ext/rbczmq/socket.c', line 975

static VALUE rb_czmq_socket_set_opt_rcvhwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvhwm, "RCVHWM", value);
}

#rcvmoreBoolean

Query if there’s more messages to receive.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.rcvmore =>  true

Returns:

  • (Boolean)


1613
1614
1615
1616
1617
1618
# File 'ext/rbczmq/socket.c', line 1613

static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}

#rcvtimeoFixnum

Returns the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo  =>  -1

Returns:

  • (Fixnum)


1653
1654
1655
1656
1657
1658
# File 'ext/rbczmq/socket.c', line 1653

static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvtimeo(sock->socket));
}

#rcvtimeout=(200) ⇒ nil

Sets the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo = 200  =>  nil

Returns:

  • (nil)


1673
1674
1675
1676
1677
# File 'ext/rbczmq/socket.c', line 1673

static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}

#readable?Boolean

Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.readable? => true

Returns:

  • (Boolean)


44
45
46
# File 'lib/zmq/socket.rb', line 44

def readable?
  (events & ZMQ::POLLIN) == ZMQ::POLLIN
end

#bind("inproc: //test") ⇒ Fixnum

Binds to a given endpoint. When the port number is ‘*’, attempts to bind to a free port. Always returns the port number on success.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.bind("tcp://localhost:*")    =>  5432

Returns:

  • (Fixnum)


204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'ext/rbczmq/socket.c', line 204

static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
    /* ZmqAssert will return false on any non-zero return code. Bind returns the port number */
    if (rc < 0) {
        ZmqAssert(rc);
    }
    /* get the endpoint name with any ephemeral ports filled in. */
    char* endpoint_string = zsocket_last_endpoint(sock->socket);
    ZmqAssert(endpoint_string != NULL);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, endpoint_string);
    sock->state = ZMQ_SOCKET_BOUND;
    rb_ary_push(sock->endpoints, rb_str_new_cstr(endpoint_string));
    free(endpoint_string);
    return INT2NUM(rc);
}

#connect("tcp: //localhost:3456") ⇒ Boolean

Attempts to connect to a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'ext/rbczmq/socket.c', line 245

static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    /* get the endpoint name with any ephemeral ports filled in. */
    char* endpoint_string = zsocket_last_endpoint(sock->socket);
    ZmqAssert(endpoint_string != NULL);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, endpoint_string);
    sock->state = ZMQ_SOCKET_CONNECTED;
    rb_ary_push(sock->endpoints, rb_str_new_cstr(endpoint_string));
    free(endpoint_string);
    return Qtrue;
}

#reconnect_ivlFixnum

Returns the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl  =>  100

Returns:

  • (Fixnum)


1464
1465
1466
1467
1468
1469
# File 'ext/rbczmq/socket.c', line 1464

static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}

#reconnect_ivl=(200) ⇒ nil

Sets the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl = 200  =>  nil

Returns:

  • (nil)


1484
1485
1486
1487
1488
# File 'ext/rbczmq/socket.c', line 1484

static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}

#reconnect_ivl_maxFixnum

Returns the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max  =>  0

Returns:

  • (Fixnum)


1503
1504
1505
1506
1507
1508
# File 'ext/rbczmq/socket.c', line 1503

static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}

#reconnect_ivl_max=(5) ⇒ nil

Sets the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max = 5  =>  nil

Returns:

  • (nil)


1523
1524
1525
1526
1527
# File 'ext/rbczmq/socket.c', line 1523

static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}

#recovery_ivlFixnum

Returns the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl  =>  10

Returns:

  • (Fixnum)


1072
1073
1074
1075
1076
1077
# File 'ext/rbczmq/socket.c', line 1072

static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl(sock->socket));
}

#recovery_ivl=(20) ⇒ nil

Sets the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl = 20  =>  nil

Returns:

  • (nil)


1092
1093
1094
1095
1096
# File 'ext/rbczmq/socket.c', line 1092

static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}

#recvString?

Receive a string from this ZMQ socket. May block depending on the socket type.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv    =>  "message"

Returns:

  • (String, nil)


546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'ext/rbczmq/socket.c', line 546

static VALUE rb_czmq_socket_recv(VALUE obj)
{
    char *str = NULL;
    struct nogvl_recv_args args;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    zmq_msg_init(&args.message);

    int rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
    if (rc < 0) {
        zmq_msg_close(&args.message);
        return Qnil;
    }
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);

    result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
    zmq_msg_close(&args.message);

    result = ZmqEncode(result);
    return result;
}

#recv_frameZMQ::Frame?

Receives a ZMQ frame from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame  =>  ZMQ::Frame or nil

Returns:



766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
# File 'ext/rbczmq/socket.c', line 766

static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
    zframe_t *frame = NULL;
    struct nogvl_recv_args args;
    char print_prefix[255];
    char *cur_time = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    frame = (zframe_t *)rb_thread_call_without_gvl(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_frame_nonblockZMQ::Frame?

Receives a ZMQ frame from this socket. Does not block

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame_nonblock  =>  ZMQ::Frame or nil

Returns:



800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
# File 'ext/rbczmq/socket.c', line 800

static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
    zframe_t *frame = NULL;
    char print_prefix[255];
    char *cur_time = NULL;
    errno = 0;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    frame = zframe_recv_nowait(sock->socket);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame_nonblock", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_messageZMQ::Message?

Receives a ZMQ message from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_message  =>  ZMQ::Message or nil

Returns:



846
847
848
849
850
851
852
853
854
855
856
857
858
859
# File 'ext/rbczmq/socket.c', line 846

static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
    zmsg_t *message = NULL;
    struct nogvl_recv_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    message = (zmsg_t *)rb_thread_call_without_gvl(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
    if (message == NULL) return Qnil;
    if (sock->verbose) ZmqDumpMessage("recv_message", message);
    return rb_czmq_alloc_message(message);
}

#recv_nonblockString?

Receive a string from this ZMQ socket. Does not block.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_nonblock    =>  "message"

Returns:

  • (String, nil)


588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
# File 'ext/rbczmq/socket.c', line 588

static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
    struct nogvl_recv_args args;
    errno = 0;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);

    zmq_msg_init(&args.message);

    int rc = zmq_recvmsg(sock->socket, &args.message, ZMQ_DONTWAIT);
    if (rc < 0) {
        zmq_msg_close(&args.message);
        return Qnil;
    }
    ZmqAssertSysError();

    result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
    zmq_msg_close(&args.message);

    if (sock->verbose) {
        zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(result));
    }

    result = ZmqEncode(result);
    return result;
}

#router_mandatory=(true) ⇒ nil

Sets the socket ROUTER_MANDATORY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.router_mandatory = true  =>  nil

Returns:

  • (nil)


1249
1250
1251
1252
1253
# File 'ext/rbczmq/socket.c', line 1249

static VALUE rb_czmq_socket_set_opt_router_mandatory(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_router_mandatory, "ROUTER_MANDATORY", value);
}

#router_raw=(true) ⇒ nil

Sets the socket ROUTER_RAW value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.router_raw = true  =>  nil

Returns:

  • (nil)


1268
1269
1270
1271
1272
# File 'ext/rbczmq/socket.c', line 1268

static VALUE rb_czmq_socket_set_opt_router_raw(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_router_raw, "ROUTER_RAW", value);
}

#send("message") ⇒ Boolean

Sends a string to this ZMQ socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.send("message")    =>  true

Returns:

  • (Boolean)


458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
# File 'ext/rbczmq/socket.c', line 458

static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    StringValue(msg);
    Check_Type(msg, T_STRING);
    args.msg = RSTRING_PTR(msg);
    args.length = RSTRING_LEN(msg);
    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
    return Qtrue;
}

#send_frame(frame) ⇒ nil

Sends a ZMQ::Frame instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
frame = ZMQ::Frame("frame")
sock.send_frame(frame)    =>  nil
frame = ZMQ::Frame("multi")
sock.send_frame(frame, ZMQ::Frame::MORE)

Returns:

  • (nil)


648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
# File 'ext/rbczmq/socket.c', line 648

static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
    struct nogvl_send_frame_args args;
    VALUE frame_obj;
    VALUE flags;
    char print_prefix[255];
    char *cur_time = NULL;
    zframe_t *print_frame = NULL;
    int rc, flgs;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "11", &frame_obj, &flags);
    ZmqGetFrame(frame_obj);
    ZmqAssertFrameOwnedNoMessage(frame);

    if (NIL_P(flags)) {
        flgs = 0;
    } else {
        if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
        Check_Type(flags, T_FIXNUM);
        flgs = FIX2INT(flags);
    }

    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        print_frame = (flgs & ZFRAME_REUSE) ? frame->frame : zframe_dup(frame->frame);
    }
    args.socket = sock;
    args.frame = frame->frame;
    args.flags = flgs;
    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if ((flgs & ZFRAME_REUSE) == 0) {
        /* frame has been destroyed, clear the owns flag */
        frame->flags &= ~ZMQ_FRAME_OWNED;
    }
    if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
    return Qtrue;
}

#send_message(msg) ⇒ nil

Sends a ZMQ::Message instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
msg = ZMQ::Message.new
msg.push ZMQ::Frame("header")
sock.send_message(msg)   =>  nil

Returns:

  • (nil)


720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
# File 'ext/rbczmq/socket.c', line 720

static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
    struct nogvl_send_message_args args;
    zmsg_t *print_message = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    ZmqGetMessage(message_obj);
    ZmqAssertMessageOwned(message);
    if (sock->verbose) print_message = zmsg_dup(message->message);
    args.socket = sock;
    args.message = message->message;
    rb_thread_call_without_gvl(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
    message->flags &= ~ZMQ_MESSAGE_OWNED;
    if (sock->verbose) ZmqDumpMessage("send_message", print_message);
    return Qnil;
}

#sendm("message") ⇒ Boolean

Sends a string to this ZMQ socket, with a more flag set.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.sendm("mes")    =>  true
sock.sendm("sage")    =>  true

Returns:

  • (Boolean)


493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
# File 'ext/rbczmq/socket.c', line 493

static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    StringValue(msg);
    Check_Type(msg, T_STRING);
    args.msg = RSTRING_PTR(msg);
    args.length = RSTRING_LEN(msg);
    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
    return Qtrue;
}

#sndbufFixnum

Returns the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf  =>  0

Returns:

  • (Fixnum)


1306
1307
1308
1309
1310
1311
# File 'ext/rbczmq/socket.c', line 1306

static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndbuf(sock->socket));
}

#sndbuf=(1000) ⇒ nil

Sets the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf = 1000  =>  nil

Returns:

  • (nil)


1326
1327
1328
1329
1330
# File 'ext/rbczmq/socket.c', line 1326

static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}

#sndhwmFixnum

Returns the socket send HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm  =>  0

Returns:

  • (Fixnum)


914
915
916
917
918
919
# File 'ext/rbczmq/socket.c', line 914

static VALUE rb_czmq_socket_opt_sndhwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndhwm(sock->socket));
}

#sndhwm=(100) ⇒ nil

Sets the socket send HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm = 100  =>  nil
sock.sndhwm  =>  100

Returns:

  • (nil)


935
936
937
938
939
# File 'ext/rbczmq/socket.c', line 935

static VALUE rb_czmq_socket_set_opt_sndhwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndhwm, "SNDHWM", value);
}

#sndtimeoFixnum

Returns the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo  =>  -1

Returns:

  • (Fixnum)


1692
1693
1694
1695
1696
1697
# File 'ext/rbczmq/socket.c', line 1692

static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndtimeo(sock->socket));
}

#sndtimeout=(200) ⇒ nil

Sets the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo = 200  =>  nil

Returns:

  • (nil)


1712
1713
1714
1715
1716
# File 'ext/rbczmq/socket.c', line 1712

static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}

#stateString

Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.state       =>   ZMQ::Socket::PENDING
sock.bind("inproc://test")
sock.state       =>   ZMQ::Socket::BOUND

Returns:

  • (String)


129
130
131
132
133
134
# File 'ext/rbczmq/socket.c', line 129

static VALUE rb_czmq_socket_state(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(sock->state);
}

#subscribe(value) ⇒ Object

Subscribes this SUB socket to a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.subscribe "ruby"  =>  nil


1571
1572
1573
1574
1575
1576
1577
# File 'ext/rbczmq/socket.c', line 1571

static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#to_sObject

Generates a string representation of the current socket state

socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”



71
72
73
74
75
76
77
78
79
80
# File 'lib/zmq/socket.rb', line 71

def to_s
  case state
  when BOUND
    "#{type_str} socket bound to #{endpoints.join(', ')}"
  when CONNECTED
    "#{type_str} socket connected to #{endpoints.join(', ')}"
  else
    "#{type_str} socket"
  end
end

#type_strObject

Generates a string representation of this socket type

socket = ctx.socket(:PUB) socket.type_str => “PUB”



62
63
64
# File 'lib/zmq/socket.rb', line 62

def type_str
  self.class.const_get(:TYPE_STR)
end

#disconnect("tcp: //localhost:3456") ⇒ Boolean

Attempts to disconnect from a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true
rep.unbind("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'ext/rbczmq/socket.c', line 353

static VALUE rb_czmq_socket_unbind(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_call_without_gvl(rb_czmq_nogvl_socket_unbind, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: unbound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    rb_ary_delete(sock->endpoints, endpoint);
    long endpoint_count = RARRAY_LEN(sock->endpoints);
    if (endpoint_count == 0) {
        sock->state = ZMQ_SOCKET_DISCONNECTED;
    }
    return Qtrue;
}

#unsubscribe(value) ⇒ Object

Unsubscribes this SUB socket from a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.unsubscribe "ruby"  =>  nil


1592
1593
1594
1595
1596
1597
1598
# File 'ext/rbczmq/socket.c', line 1592

static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#verbose=(true) ⇒ nil

Let this socket be verbose - dumps a lot of data to stdout for debugging.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.verbose = true    =>  nil

Returns:

  • (nil)


389
390
391
392
393
394
395
396
397
# File 'ext/rbczmq/socket.c', line 389

static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
    bool vlevel;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    vlevel = (level == Qtrue) ? true : false;
    sock->verbose = vlevel;
    return Qnil;
}

#writable?Boolean

Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.writable? => true

Returns:

  • (Boolean)


53
54
55
# File 'lib/zmq/socket.rb', line 53

def writable?
  (events & ZMQ::POLLOUT) == ZMQ::POLLOUT
end

#xpub_verbose=(true) ⇒ nil

Sets the socket XPUB_VERBOSE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.xpub_verbose = true  =>  nil

Returns:

  • (nil)


1287
1288
1289
1290
1291
# File 'ext/rbczmq/socket.c', line 1287

static VALUE rb_czmq_socket_set_opt_xpub_verbose(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_xpub_verbose, "XPUB_VERBOSE", value);
}