Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/socket.rb,
ext/rbczmq/socket.c

Direct Known Subclasses

Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub, XPub, XSub

Defined Under Namespace

Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub, XPub, XSub

Constant Summary collapse

PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)
DISCONNECTED =
INT2NUM(ZMQ_SOCKET_DISCONNECTED)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.handle_fsm_errors(error, *methods) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/zmq/socket.rb', line 14

def self.handle_fsm_errors(error, *methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args);
        super
      rescue SystemCallError => e
        raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM
        raise
      end
    evl
  end
end

.unsupported_api(*methods) ⇒ Object



6
7
8
9
10
11
12
# File 'lib/zmq/socket.rb', line 6

def self.unsupported_api(*methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!");  end
    evl
  end
end

Instance Method Details

#affinityFixnum

Returns the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity  =>  0

Returns:

  • (Fixnum)


943
944
945
946
947
948
# File 'ext/rbczmq/socket.c', line 943

static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_affinity(sock->socket));
}

#affinity=(1) ⇒ nil

Sets the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity = 1  =>  nil

Returns:

  • (nil)


963
964
965
966
967
# File 'ext/rbczmq/socket.c', line 963

static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}

#backlogFixnum

Returns the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog  =>  100

Returns:

  • (Fixnum)


1374
1375
1376
1377
1378
1379
# File 'ext/rbczmq/socket.c', line 1374

static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_backlog(sock->socket));
}

#backlog=(200) ⇒ nil

Sets the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog = 200  =>  nil

Returns:

  • (nil)


1394
1395
1396
1397
1398
# File 'ext/rbczmq/socket.c', line 1394

static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}

#bind(uri) ⇒ Object

Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”

socket.bind “collector.domain.com” # resolves 10.0.0.2:9000



99
100
101
102
# File 'lib/zmq/socket.rb', line 99

def bind(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_bind(uri)
end

#closenil

Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

sock.close    =>  nil

Returns:

  • (nil)


96
97
98
99
100
101
102
103
104
105
106
# File 'ext/rbczmq/socket.c', line 96

static VALUE rb_czmq_socket_close(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    /* This is useless for production / real use cases as we can't query the state again OR assume
    anything about the underlying connection. Merely doing the right thing. */
    sock->state = ZMQ_SOCKET_PENDING;
    rb_czmq_free_sock(sock);
    return Qnil;
}

#connect(uri) ⇒ Object

Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”

socket.connect “collector.domain.com” # resolves 10.0.0.2:9000



111
112
113
114
# File 'lib/zmq/socket.rb', line 111

def connect(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_connect(uri)
end

#connect_all(uri) ⇒ Object

Connects to all endpoints that are returned from a SRV record lookup.

socket = ctx.socket(:PUB)

socket.connect “collector.domain.com” # resolves 10.0.0.2:9000 10.0.0.3:9000



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/zmq/socket.rb', line 122

def connect_all(uri)
  if uri =~ PROTO_REXP
    real_connect(uri)
    return
  end

  addresses = resolve_all(uri)
  if addresses.empty?
    real_connect(uri)
  else
    addresses.each do |address|
      host = Resolv.getaddress(address.target.to_s)
      real_connect("tcp://#{host}:#{address.port}")
    end
  end
  self
end

#delay_attach_on_connect=(true) ⇒ nil

Sets the socket DELAY_ATTACH_ON_CONNECT value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.delay_attach_on_connect = true  =>  nil

Returns:

  • (nil)


1179
1180
1181
1182
1183
# File 'ext/rbczmq/socket.c', line 1179

static VALUE rb_czmq_socket_set_opt_delay_attach_on_connect(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_delay_attach_on_connect, "DELAY_ATTACH_ON_CONNECT", value);
}

#disconnect("tcp: //localhost:3456") ⇒ Boolean

Attempts to disconnect from a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true
req.disconnect("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'ext/rbczmq/socket.c', line 307

static VALUE rb_czmq_socket_disconnect(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_disconnect, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: disconnected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_DISCONNECTED;
    rb_ary_delete(sock->endpoints, endpoint);
    return Qtrue;
}

#endpointObject

Returns the endpoint this socket is currently connected to, if any.

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoint    =>   nil
sock.bind("inproc://test")
sock.endpoint    =>  "inproc://test"


35
36
37
# File 'lib/zmq/socket.rb', line 35

def endpoint
  endpoints.first
end

#endpointsArray of Strings

Returns the endpoints this socket is currently connected to, if any.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoints    =>   []
sock.bind("inproc://test")
sock.endpoints    =>  ["inproc://test"]

Returns:

  • (Array of Strings)


123
124
125
126
127
128
# File 'ext/rbczmq/socket.c', line 123

static VALUE rb_czmq_socket_endpoints(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return sock->endpoints;
}

#eventsFixnum

Query if this socket is in a readable or writable state.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.events => ZMQ::POLLIN

Returns:

  • (Fixnum)


1582
1583
1584
1585
1586
1587
# File 'ext/rbczmq/socket.c', line 1582

static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_events(sock->socket));
}

#fdFixnum Also known as: to_i

Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor. Edge-triggered notification of I/O state changes.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.fd       =>   -1
sock.bind("inproc://test")
sock.fd       =>   4

Returns:

  • (Fixnum)


168
169
170
171
172
173
174
# File 'ext/rbczmq/socket.c', line 168

static VALUE rb_czmq_socket_fd(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    if (sock->state == ZMQ_SOCKET_PENDING || sock->state == ZMQ_SOCKET_DISCONNECTED) return INT2NUM(-1);
    return INT2NUM(zsocket_fd(sock->socket));
}

#identity=(value) ⇒ Object

Sets the socket IDENTITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.identity = "anonymous"  =>  nil


1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
# File 'ext/rbczmq/socket.c', line 1491

static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
    char *val;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(value, T_STRING);
    if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
    if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
    val = StringValueCStr(value);
    zsocket_set_identity(sock->socket, val);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
    return Qnil;
}

#ipv4only=(true) ⇒ nil

Sets the socket IPV4ONLY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.ipv4only = true  =>  nil

Returns:

  • (nil)


1160
1161
1162
1163
1164
# File 'ext/rbczmq/socket.c', line 1160

static VALUE rb_czmq_socket_set_opt_ipv4only(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_ipv4only, "IPV4ONLY", value);
}

#ipv4onlyBoolean

Returns the socket IPV4ONLY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.ipv4only  =>  false

Returns:

  • (Boolean)


1138
1139
1140
1141
1142
1143
1144
1145
# File 'ext/rbczmq/socket.c', line 1138

static VALUE rb_czmq_socket_opt_ipv4only(VALUE obj)
{
    int ipv4only;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ipv4only = zsocket_ipv4only(sock->socket);
    return (ipv4only == 0 ? Qfalse : Qtrue);
}

#lingerFixnum

Returns the socket LINGER value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger  =>  -1

Returns:

  • (Fixnum)


1333
1334
1335
1336
1337
1338
1339
1340
# File 'ext/rbczmq/socket.c', line 1333

static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    // return INT2NUM(zsocket_linger(sock->socket));
    // TODO: how to get the linger value in ZMQ4/CZMQ2?
    return INT2NUM(-1);
}

#linger=(1000) ⇒ nil

Sets the socket LINGER value in ms.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger = 1000  =>  nil

Returns:

  • (nil)


1355
1356
1357
1358
1359
# File 'ext/rbczmq/socket.c', line 1355

static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}

#maxmsgsizeFixnum

Returns the socket MAXMSGSIZE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.maxmsgsize  =>  10

Returns:

  • (Fixnum)


1060
1061
1062
1063
1064
1065
# File 'ext/rbczmq/socket.c', line 1060

static VALUE rb_czmq_socket_opt_maxmsgsize(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_maxmsgsize(sock->socket));
}

#maxmsgsize=(20) ⇒ nil

Sets the socket MAXMSGSIZE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.maxmsgsize = 20  =>  nil

Returns:

  • (nil)


1080
1081
1082
1083
1084
# File 'ext/rbczmq/socket.c', line 1080

static VALUE rb_czmq_socket_set_opt_maxmsgsize(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_maxmsgsize, "MAXMSGSIZE", value);
}

#monitor("inproc: //monitoring", callback, events) ⇒ nil

Registers a monitoring callback for this socket

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
rep.monitor("inproc://monitoring.rep", RepMonitor)
req = ctx.socket(:REQ)
req.monitor("inproc://monitoring.req", ReqMonitor, ZMQ_EVENT_DISCONNECTED)
rep.bind("tcp://127.0.0.1:5331")
rep.bind("tcp://127.0.0.1:5332")

Returns:

  • (nil)


1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
# File 'ext/rbczmq/socket.c', line 1780

static VALUE rb_czmq_socket_monitor(int argc, VALUE *argv, VALUE obj)
{
    VALUE endpoint;
    VALUE handler;
    VALUE events;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "12", &endpoint, &handler, &events);
    Check_Type(endpoint, T_STRING);
    if (NIL_P(events))
        events = rb_const_get_at(rb_mZmq, rb_intern("EVENT_ALL"));
    if (NIL_P(handler)) {
        handler = rb_class_new_instance(0, NULL, rb_const_get_at(rb_mZmq, rb_intern("Monitor")));
    }
    Check_Type(events, T_FIXNUM);
    rc = zmq_socket_monitor(sock->socket, StringValueCStr(endpoint), NUM2INT(events));
    if (rc == 0) {
        sock->monitor_endpoint = endpoint;
        sock->monitor_handler = handler;
        sock->monitor_thread = rb_thread_create(rb_czmq_socket_monitor_thread, (void*)sock);
        return Qtrue;
    } else {
        return Qfalse;
    }
}

#multicast_hopsFixnum

Returns the socket MULTICAST_HOPS value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.multicast_hops  =>  10

Returns:

  • (Fixnum)


1099
1100
1101
1102
1103
1104
# File 'ext/rbczmq/socket.c', line 1099

static VALUE rb_czmq_socket_opt_multicast_hops(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_multicast_hops(sock->socket));
}

#multicast_hops=(20) ⇒ nil

Sets the socket MULTICAST_HOPS value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.multicast_hops = 20  =>  nil

Returns:

  • (nil)


1119
1120
1121
1122
1123
# File 'ext/rbczmq/socket.c', line 1119

static VALUE rb_czmq_socket_set_opt_multicast_hops(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_multicast_hops, "MULTICAST_HOPS", value);
}

#poll(100) ⇒ Boolean

Poll for input events on the socket. Returns true if there is input, otherwise false.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.poll(100)  =>  true

Returns:

  • (Boolean)


836
837
838
839
840
841
842
843
844
845
846
847
848
# File 'ext/rbczmq/socket.c', line 836

static VALUE rb_czmq_socket_poll(VALUE obj, VALUE timeout)
{
    bool readable;
    struct nogvl_socket_poll_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    Check_Type(timeout, T_FIXNUM);
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    args.timeout = FIX2INT(timeout);
    readable = (bool)rb_thread_blocking_region(rb_czmq_nogvl_poll, (void *)&args, RUBY_UBF_IO, 0);
    return (readable == true) ? Qtrue : Qfalse;
}

#poll_readable?Boolean

Poll all sockets for readbable states by default

Returns:

  • (Boolean)


83
84
85
# File 'lib/zmq/socket.rb', line 83

def poll_readable?
  true
end

#poll_writable?Boolean

Poll all sockets for writable states by default

Returns:

  • (Boolean)


88
89
90
# File 'lib/zmq/socket.rb', line 88

def poll_writable?
  true
end

#rateFixnum

Returns the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate  =>  40000

Returns:

  • (Fixnum)


982
983
984
985
986
987
# File 'ext/rbczmq/socket.c', line 982

static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rate(sock->socket));
}

#rate=(50000) ⇒ nil

Sets the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate = 50000  =>  nil

Returns:

  • (nil)


1002
1003
1004
1005
1006
# File 'ext/rbczmq/socket.c', line 1002

static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}

#rcvbufFixnum

Returns the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf  =>  0

Returns:

  • (Fixnum)


1294
1295
1296
1297
1298
1299
# File 'ext/rbczmq/socket.c', line 1294

static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvbuf(sock->socket));
}

#rcvbuf=(1000) ⇒ nil

Sets the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf = 1000  =>  nil

Returns:

  • (nil)


1314
1315
1316
1317
1318
# File 'ext/rbczmq/socket.c', line 1314

static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}

#rcvhwmFixnum

Returns the socket receive HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm  =>  0

Returns:

  • (Fixnum)


903
904
905
906
907
908
# File 'ext/rbczmq/socket.c', line 903

static VALUE rb_czmq_socket_opt_rcvhwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvhwm(sock->socket));
}

#rcvhwm=(100) ⇒ nil

Sets the socket receive HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvhwm = 100  =>  nil
sock.rcvhwm  =>  100

Returns:

  • (nil)


924
925
926
927
928
# File 'ext/rbczmq/socket.c', line 924

static VALUE rb_czmq_socket_set_opt_rcvhwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvhwm, "RCVHWM", value);
}

#rcvmoreBoolean

Query if there’s more messages to receive.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.rcvmore =>  true

Returns:

  • (Boolean)


1562
1563
1564
1565
1566
1567
# File 'ext/rbczmq/socket.c', line 1562

static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}

#rcvtimeoFixnum

Returns the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo  =>  -1

Returns:

  • (Fixnum)


1602
1603
1604
1605
1606
1607
# File 'ext/rbczmq/socket.c', line 1602

static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvtimeo(sock->socket));
}

#rcvtimeout=(200) ⇒ nil

Sets the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo = 200  =>  nil

Returns:

  • (nil)


1622
1623
1624
1625
1626
# File 'ext/rbczmq/socket.c', line 1622

static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}

#readable?Boolean

Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.readable? => true

Returns:

  • (Boolean)


44
45
46
# File 'lib/zmq/socket.rb', line 44

def readable?
  (events & ZMQ::POLLIN) == ZMQ::POLLIN
end

#bind("inproc: //test") ⇒ Fixnum

Binds to a given endpoint. When the port number is ‘*’, attempts to bind to a free port. Always returns the port number on success.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.bind("tcp://localhost:*")    =>  5432

Returns:

  • (Fixnum)


220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'ext/rbczmq/socket.c', line 220

static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
    /* ZmqAssert will return false on any non-zero return code. Bind returns the port number */
    if (rc < 0) {
        ZmqAssert(rc);
    }
    if (sock->verbose)
        zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_BOUND;
    rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
    return INT2NUM(rc);
}

#connect("tcp: //localhost:3456") ⇒ Boolean

Attempts to connect to a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'ext/rbczmq/socket.c', line 257

static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_CONNECTED;
    rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
    return Qtrue;
}

#reconnect_ivlFixnum

Returns the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl  =>  100

Returns:

  • (Fixnum)


1413
1414
1415
1416
1417
1418
# File 'ext/rbczmq/socket.c', line 1413

static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}

#reconnect_ivl=(200) ⇒ nil

Sets the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl = 200  =>  nil

Returns:

  • (nil)


1433
1434
1435
1436
1437
# File 'ext/rbczmq/socket.c', line 1433

static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}

#reconnect_ivl_maxFixnum

Returns the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max  =>  0

Returns:

  • (Fixnum)


1452
1453
1454
1455
1456
1457
# File 'ext/rbczmq/socket.c', line 1452

static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}

#reconnect_ivl_max=(5) ⇒ nil

Sets the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max = 5  =>  nil

Returns:

  • (nil)


1472
1473
1474
1475
1476
# File 'ext/rbczmq/socket.c', line 1472

static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}

#recovery_ivlFixnum

Returns the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl  =>  10

Returns:

  • (Fixnum)


1021
1022
1023
1024
1025
1026
# File 'ext/rbczmq/socket.c', line 1021

static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl(sock->socket));
}

#recovery_ivl=(20) ⇒ nil

Sets the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl = 20  =>  nil

Returns:

  • (nil)


1041
1042
1043
1044
1045
# File 'ext/rbczmq/socket.c', line 1041

static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}

#recvString?

Receive a string from this ZMQ socket. May block depending on the socket type.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv    =>  "message"

Returns:

  • (String, nil)


496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
# File 'ext/rbczmq/socket.c', line 496

static VALUE rb_czmq_socket_recv(VALUE obj)
{
    char *str = NULL;
    struct nogvl_recv_args args;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    zmq_msg_init(&args.message);

    int rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
    if (rc < 0) {
        zmq_msg_close(&args.message);
        return Qnil;
    }
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);

    result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
    zmq_msg_close(&args.message);

    result = ZmqEncode(result);
    return result;
}

#recv_frameZMQ::Frame?

Receives a ZMQ frame from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame  =>  ZMQ::Frame or nil

Returns:



715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
# File 'ext/rbczmq/socket.c', line 715

static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
    zframe_t *frame = NULL;
    struct nogvl_recv_args args;
    char print_prefix[255];
    char *cur_time = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_frame_nonblockZMQ::Frame?

Receives a ZMQ frame from this socket. Does not block

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame_nonblock  =>  ZMQ::Frame or nil

Returns:



749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
# File 'ext/rbczmq/socket.c', line 749

static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
    zframe_t *frame = NULL;
    char print_prefix[255];
    char *cur_time = NULL;
    errno = 0;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    frame = zframe_recv_nowait(sock->socket);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame_nonblock", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_messageZMQ::Message?

Receives a ZMQ message from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_message  =>  ZMQ::Message or nil

Returns:



795
796
797
798
799
800
801
802
803
804
805
806
807
808
# File 'ext/rbczmq/socket.c', line 795

static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
    zmsg_t *message = NULL;
    struct nogvl_recv_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
    if (message == NULL) return Qnil;
    if (sock->verbose) ZmqDumpMessage("recv_message", message);
    return rb_czmq_alloc_message(message);
}

#recv_nonblockString?

Receive a string from this ZMQ socket. Does not block.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_nonblock    =>  "message"

Returns:

  • (String, nil)


538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
# File 'ext/rbczmq/socket.c', line 538

static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
    char *str = NULL;
    struct nogvl_recv_args args;
    errno = 0;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);

    zmq_msg_init(&args.message);

    int rc = zmq_recvmsg(sock->socket, &args.message, ZMQ_DONTWAIT);
    if (rc < 0) {
        zmq_msg_close(&args.message);
        return Qnil;
    }
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);

    result = rb_str_new(zmq_msg_data(&args.message), zmq_msg_size(&args.message));
    zmq_msg_close(&args.message);

    result = ZmqEncode(result);
    return result;
}

#router_mandatory=(true) ⇒ nil

Sets the socket ROUTER_MANDATORY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.router_mandatory = true  =>  nil

Returns:

  • (nil)


1198
1199
1200
1201
1202
# File 'ext/rbczmq/socket.c', line 1198

static VALUE rb_czmq_socket_set_opt_router_mandatory(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_router_mandatory, "ROUTER_MANDATORY", value);
}

#router_raw=(true) ⇒ nil

Sets the socket ROUTER_RAW value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.router_raw = true  =>  nil

Returns:

  • (nil)


1217
1218
1219
1220
1221
# File 'ext/rbczmq/socket.c', line 1217

static VALUE rb_czmq_socket_set_opt_router_raw(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_router_raw, "ROUTER_RAW", value);
}

#send("message") ⇒ Boolean

Sends a string to this ZMQ socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.send("message")    =>  true

Returns:

  • (Boolean)


408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'ext/rbczmq/socket.c', line 408

static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    StringValue(msg);
    Check_Type(msg, T_STRING);
    args.msg = RSTRING_PTR(msg);
    args.length = RSTRING_LEN(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
    return Qtrue;
}

#send_frame(frame) ⇒ nil

Sends a ZMQ::Frame instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
frame = ZMQ::Frame("frame")
sock.send_frame(frame)    =>  nil
frame = ZMQ::Frame("multi")
sock.send_frame(frame, ZMQ::Frame::MORE)

Returns:

  • (nil)


597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
# File 'ext/rbczmq/socket.c', line 597

static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
    struct nogvl_send_frame_args args;
    VALUE frame_obj;
    VALUE flags;
    char print_prefix[255];
    char *cur_time = NULL;
    zframe_t *print_frame = NULL;
    int rc, flgs;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "11", &frame_obj, &flags);
    ZmqGetFrame(frame_obj);
    ZmqAssertFrameOwnedNoMessage(frame);

    if (NIL_P(flags)) {
        flgs = 0;
    } else {
        if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
        Check_Type(flags, T_FIXNUM);
        flgs = FIX2INT(flags);
    }

    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        print_frame = (flgs & ZFRAME_REUSE) ? frame->frame : zframe_dup(frame->frame);
    }
    args.socket = sock;
    args.frame = frame->frame;
    args.flags = flgs;
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if ((flgs & ZFRAME_REUSE) == 0) {
        /* frame has been destroyed, clear the owns flag */
        frame->flags &= ~ZMQ_FRAME_OWNED;
    }
    if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
    return Qtrue;
}

#send_message(msg) ⇒ nil

Sends a ZMQ::Message instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
msg = ZMQ::Message.new
msg.push ZMQ::Frame("header")
sock.send_message(msg)   =>  nil

Returns:

  • (nil)


669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
# File 'ext/rbczmq/socket.c', line 669

static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
    struct nogvl_send_message_args args;
    zmsg_t *print_message = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    ZmqGetMessage(message_obj);
    ZmqAssertMessageOwned(message);
    if (sock->verbose) print_message = zmsg_dup(message->message);
    args.socket = sock;
    args.message = message->message;
    rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
    message->flags &= ~ZMQ_MESSAGE_OWNED;
    if (sock->verbose) ZmqDumpMessage("send_message", print_message);
    return Qnil;
}

#sendm("message") ⇒ Boolean

Sends a string to this ZMQ socket, with a more flag set.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.sendm("mes")    =>  true
sock.sendm("sage")    =>  true

Returns:

  • (Boolean)


443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
# File 'ext/rbczmq/socket.c', line 443

static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    StringValue(msg);
    Check_Type(msg, T_STRING);
    args.msg = RSTRING_PTR(msg);
    args.length = RSTRING_LEN(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
    return Qtrue;
}

#sndbufFixnum

Returns the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf  =>  0

Returns:

  • (Fixnum)


1255
1256
1257
1258
1259
1260
# File 'ext/rbczmq/socket.c', line 1255

static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndbuf(sock->socket));
}

#sndbuf=(1000) ⇒ nil

Sets the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf = 1000  =>  nil

Returns:

  • (nil)


1275
1276
1277
1278
1279
# File 'ext/rbczmq/socket.c', line 1275

static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}

#sndhwmFixnum

Returns the socket send HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm  =>  0

Returns:

  • (Fixnum)


863
864
865
866
867
868
# File 'ext/rbczmq/socket.c', line 863

static VALUE rb_czmq_socket_opt_sndhwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndhwm(sock->socket));
}

#sndhwm=(100) ⇒ nil

Sets the socket send HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm = 100  =>  nil
sock.sndhwm  =>  100

Returns:

  • (nil)


884
885
886
887
888
# File 'ext/rbczmq/socket.c', line 884

static VALUE rb_czmq_socket_set_opt_sndhwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndhwm, "SNDHWM", value);
}

#sndtimeoFixnum

Returns the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo  =>  -1

Returns:

  • (Fixnum)


1641
1642
1643
1644
1645
1646
# File 'ext/rbczmq/socket.c', line 1641

static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndtimeo(sock->socket));
}

#sndtimeout=(200) ⇒ nil

Sets the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo = 200  =>  nil

Returns:

  • (nil)


1661
1662
1663
1664
1665
# File 'ext/rbczmq/socket.c', line 1661

static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}

#stateString

Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.state       =>   ZMQ::Socket::PENDING
sock.bind("inproc://test")
sock.state       =>   ZMQ::Socket::BOUND

Returns:

  • (String)


145
146
147
148
149
150
# File 'ext/rbczmq/socket.c', line 145

static VALUE rb_czmq_socket_state(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(sock->state);
}

#subscribe(value) ⇒ Object

Subscribes this SUB socket to a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.subscribe "ruby"  =>  nil


1520
1521
1522
1523
1524
1525
1526
# File 'ext/rbczmq/socket.c', line 1520

static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#to_sObject

Generates a string representation of the current socket state

socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”



71
72
73
74
75
76
77
78
79
80
# File 'lib/zmq/socket.rb', line 71

def to_s
  case state
  when BOUND
    "#{type_str} socket bound to #{endpoints.join(', ')}"
  when CONNECTED
    "#{type_str} socket connected to #{endpoints.join(', ')}"
  else
    "#{type_str} socket"
  end
end

#type_strObject

Generates a string representation of this socket type

socket = ctx.socket(:PUB) socket.type_str => “PUB”



62
63
64
# File 'lib/zmq/socket.rb', line 62

def type_str
  self.class.const_get(:TYPE_STR)
end

#unsubscribe(value) ⇒ Object

Unsubscribes this SUB socket from a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.unsubscribe "ruby"  =>  nil


1541
1542
1543
1544
1545
1546
1547
# File 'ext/rbczmq/socket.c', line 1541

static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#verbose=(true) ⇒ nil

Let this socket be verbose - dumps a lot of data to stdout for debugging.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.verbose = true    =>  nil

Returns:

  • (nil)


339
340
341
342
343
344
345
346
347
# File 'ext/rbczmq/socket.c', line 339

static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
    bool vlevel;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    vlevel = (level == Qtrue) ? true : false;
    sock->verbose = vlevel;
    return Qnil;
}

#writable?Boolean

Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.writable? => true

Returns:

  • (Boolean)


53
54
55
# File 'lib/zmq/socket.rb', line 53

def writable?
  (events & ZMQ::POLLOUT) == ZMQ::POLLOUT
end

#xpub_verbose=(true) ⇒ nil

Sets the socket XPUB_VERBOSE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.xpub_verbose = true  =>  nil

Returns:

  • (nil)


1236
1237
1238
1239
1240
# File 'ext/rbczmq/socket.c', line 1236

static VALUE rb_czmq_socket_set_opt_xpub_verbose(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_xpub_verbose, "XPUB_VERBOSE", value);
}