Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/zmq/socket.rb,
ext/rbczmq/socket.c

Direct Known Subclasses

Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub

Defined Under Namespace

Classes: Dealer, Pair, Pub, Pull, Push, Rep, Req, Router, Sub

Constant Summary collapse

PROTO_REXP =
/^inproc|ipc|tcp|e?pgm:\/\//
PENDING =
INT2NUM(ZMQ_SOCKET_PENDING)
BOUND =
INT2NUM(ZMQ_SOCKET_BOUND)
CONNECTED =
INT2NUM(ZMQ_SOCKET_CONNECTED)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.handle_fsm_errors(error, *methods) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/zmq/socket.rb', line 14

def self.handle_fsm_errors(error, *methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args);
        super
      rescue SystemCallError => e
        raise(ZMQ::Error, "#{error} Please assert that you're not sending / receiving out of band data when using the REQ / REP socket pairs.") if e.errno == ZMQ::EFSM
        raise
      end
    evl
  end
end

.unsupported_api(*methods) ⇒ Object



6
7
8
9
10
11
12
# File 'lib/zmq/socket.rb', line 6

def self.unsupported_api(*methods)
  methods.each do |m|
    class_eval <<-"evl", __FILE__, __LINE__
      def #{m}(*args); raise(ZMQ::Error, "API #{m} not supported for #{const_get(:TYPE_STR)} sockets!");  end
    evl
  end
end

Instance Method Details

#affinityFixnum

Returns the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity  =>  0

Returns:

  • (Fixnum)


983
984
985
986
987
988
# File 'ext/rbczmq/socket.c', line 983

static VALUE rb_czmq_socket_opt_affinity(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_affinity(sock->socket));
}

#affinity=(1) ⇒ nil

Sets the socket AFFINITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.affinity = 1  =>  nil

Returns:

  • (nil)


1003
1004
1005
1006
1007
# File 'ext/rbczmq/socket.c', line 1003

static VALUE rb_czmq_socket_set_opt_affinity(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_affinity, "AFFINITY", value);
}

#backlogFixnum

Returns the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog  =>  100

Returns:

  • (Fixnum)


1297
1298
1299
1300
1301
1302
# File 'ext/rbczmq/socket.c', line 1297

static VALUE rb_czmq_socket_opt_backlog(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_backlog(sock->socket));
}

#backlog=(200) ⇒ nil

Sets the socket BACKLOG value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.backlog = 200  =>  nil

Returns:

  • (nil)


1317
1318
1319
1320
1321
# File 'ext/rbczmq/socket.c', line 1317

static VALUE rb_czmq_socket_set_opt_backlog(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_backlog, "BACKLOG", value);
}

#bind(uri) ⇒ Object

Binds to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.bind “tcp://127.0.0.1:9000”

socket.bind “collector.domain.com” # resolves 10.0.0.2:9000



99
100
101
102
# File 'lib/zmq/socket.rb', line 99

def bind(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_bind(uri)
end

#closenil

Closes a socket. The GC will take the same action if a socket object is not reachable anymore on the next GC cycle. This is a lower level API.

Examples

sock.close    =>  nil

Returns:

  • (nil)


100
101
102
103
104
105
106
107
108
109
110
# File 'ext/rbczmq/socket.c', line 100

static VALUE rb_czmq_socket_close(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    /* This is useless for production / real use cases as we can't query the state again OR assume
    anything about the underlying connection. Merely doing the right thing. */
    sock->state = ZMQ_SOCKET_PENDING;
    rb_czmq_free_sock(sock);
    return Qnil;
}

#connect(uri) ⇒ Object

Connects to a given endpoint. Attemps to resolve URIs without a protocol through DNS SRV records.

socket = ctx.socket(:PUB) socket.connect “tcp://127.0.0.1:9000”

socket.connect “collector.domain.com” # resolves 10.0.0.2:9000



111
112
113
114
# File 'lib/zmq/socket.rb', line 111

def connect(uri)
  uri = resolve(uri) if uri && uri !~ PROTO_REXP
  real_connect(uri)
end

#connect_all(uri) ⇒ Object

Connects to all endpoints that are returned from a SRV record lookup.

socket = ctx.socket(:PUB)

socket.connect “collector.domain.com” # resolves 10.0.0.2:9000 10.0.0.3:9000



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/zmq/socket.rb', line 122

def connect_all(uri)
  if uri =~ PROTO_REXP
    real_connect(uri)
    return
  end

  addresses = resolve_all(uri)
  if addresses.empty?
    real_connect(uri)
  else
    addresses.each do |address|
      host = Resolv.getaddress(address.target.to_s)
      real_connect("tcp://#{host}:#{address.port}")
    end
  end
  self
end

#endpointObject

Returns the endpoint this socket is currently connected to, if any.

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoint    =>   nil
sock.bind("inproc://test")
sock.endpoint    =>  "inproc://test"


35
36
37
# File 'lib/zmq/socket.rb', line 35

def endpoint
  endpoints.first
end

#endpointsArray of Strings

Returns the endpoints this socket is currently connected to, if any.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.endpoints    =>   []
sock.bind("inproc://test")
sock.endpoints    =>  ["inproc://test"]

Returns:

  • (Array of Strings)


127
128
129
130
131
132
# File 'ext/rbczmq/socket.c', line 127

static VALUE rb_czmq_socket_endpoints(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return sock->endpoints;
}

#eventsFixnum

Query if this socket is in a readable or writable state.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.events => ZMQ::POLLIN

Returns:

  • (Fixnum)


1505
1506
1507
1508
1509
1510
# File 'ext/rbczmq/socket.c', line 1505

static VALUE rb_czmq_socket_opt_events(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_events(sock->socket));
}

#fdFixnum Also known as: to_i

Returns a file descriptor reference for integrating this socket with an externel event loop or multiplexor. Edge-triggered notification of I/O state changes.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.fd       =>   -1
sock.bind("inproc://test")
sock.fd       =>   4

Returns:

  • (Fixnum)


172
173
174
175
176
177
178
# File 'ext/rbczmq/socket.c', line 172

static VALUE rb_czmq_socket_fd(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    if (sock->state == ZMQ_SOCKET_PENDING) return INT2NUM(-1);
    return INT2NUM(zsocket_fd(sock->socket));
}

#hwmFixnum

Returns the socket HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.hwm  =>  0

Returns:

  • (Fixnum)


821
822
823
824
825
826
# File 'ext/rbczmq/socket.c', line 821

static VALUE rb_czmq_socket_opt_hwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_hwm(sock->socket));
}

#hwm=(100) ⇒ nil

Sets the socket HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.hwm = 100  =>  nil
sock.hwm  =>  100

Returns:

  • (nil)


842
843
844
845
846
# File 'ext/rbczmq/socket.c', line 842

static VALUE rb_czmq_socket_set_opt_hwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_hwm, "HWM", value);
}

#identity=(value) ⇒ Object

Sets the socket IDENTITY value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.identity = "anonymous"  =>  nil


1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
# File 'ext/rbczmq/socket.c', line 1414

static VALUE rb_czmq_socket_set_opt_identity(VALUE obj, VALUE value)
{
    char *val;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(value, T_STRING);
    if (RSTRING_LEN(value) == 0) rb_raise(rb_eZmqError, "socket identity cannot be empty.");
    if (RSTRING_LEN(value) > 255) rb_raise(rb_eZmqError, "maximum socket identity is 255 chars.");
    val = StringValueCStr(value);
    zsocket_set_identity(sock->socket, val);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: set option \"IDENTITY\" \"%s\"", zsocket_type_str(sock->socket), obj, val);
    return Qnil;
}

#lingerFixnum

Returns the socket LINGER value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger  =>  -1

Returns:

  • (Fixnum)


1258
1259
1260
1261
1262
1263
# File 'ext/rbczmq/socket.c', line 1258

static VALUE rb_czmq_socket_opt_linger(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_linger(sock->socket));
}

#linger=(1000) ⇒ nil

Sets the socket LINGER value in ms.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.linger = 1000  =>  nil

Returns:

  • (nil)


1278
1279
1280
1281
1282
# File 'ext/rbczmq/socket.c', line 1278

static VALUE rb_czmq_socket_set_opt_linger(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_linger, "LINGER", value);
}

#mcast_loop=(false) ⇒ nil

Sets the socket MCAST_LOOP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.mcast_loop = false  =>  nil

Returns:

  • (nil)


1160
1161
1162
1163
1164
# File 'ext/rbczmq/socket.c', line 1160

static VALUE rb_czmq_socket_set_opt_mcast_loop(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_mcast_loop, "MCAST_LOOP", value);
}

#mcast_loop?Boolean

Returns the socket MCAST_LOOP status.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.mcast_loop?  =>  true

Returns:

  • (Boolean)


1140
1141
1142
1143
1144
1145
# File 'ext/rbczmq/socket.c', line 1140

static VALUE rb_czmq_socket_opt_mcast_loop(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_mcast_loop(sock->socket) == 1) ? Qtrue : Qfalse;
}

#monitor("inproc: //monitoring", callback, events) ⇒ nil

Registers a monitoring callback for this socket

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
rep.monitor("inproc://monitoring.rep", RepMonitor)
req = ctx.socket(:REQ)
req.monitor("inproc://monitoring.req", ReqMonitor, ZMQ_EVENT_DISCONNECTED)
rep.bind("tcp://127.0.0.1:5331")
rep.bind("tcp://127.0.0.1:5332")

Returns:

  • (nil)


1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
# File 'ext/rbczmq/socket.c', line 1710

static VALUE rb_czmq_socket_monitor(int argc, VALUE *argv, VALUE obj)
{
    VALUE endpoint;
    VALUE handler;
    VALUE events;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "12", &endpoint, &handler, &events);
    Check_Type(endpoint, T_STRING);
    if (NIL_P(events))
        events = rb_const_get_at(rb_mZmq, rb_intern("EVENT_ALL"));
    if (NIL_P(handler)) {
        handler = rb_class_new_instance(0, NULL, rb_const_get_at(rb_mZmq, rb_intern("Monitor")));
    }
    Check_Type(events, T_FIXNUM);
    rc = zmq_socket_monitor(sock->socket, StringValueCStr(endpoint), NUM2INT(events));
    if (rc == 0) {
        sock->monitor_endpoint = endpoint;
        sock->monitor_handler = handler;
        sock->monitor_thread = rb_thread_create(rb_czmq_socket_monitor_thread, (void*)sock);
        rb_thread_run(sock->monitor_thread);
        return Qtrue;
    } else {
        return Qfalse;
    }
}

#poll_readable?Boolean

Poll all sockets for readbable states by default

Returns:

  • (Boolean)


83
84
85
# File 'lib/zmq/socket.rb', line 83

def poll_readable?
  true
end

#poll_writable?Boolean

Poll all sockets for writable states by default

Returns:

  • (Boolean)


88
89
90
# File 'lib/zmq/socket.rb', line 88

def poll_writable?
  true
end

#rateFixnum

Returns the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate  =>  40000

Returns:

  • (Fixnum)


1022
1023
1024
1025
1026
1027
# File 'ext/rbczmq/socket.c', line 1022

static VALUE rb_czmq_socket_opt_rate(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rate(sock->socket));
}

#rate=(50000) ⇒ nil

Sets the socket RATE value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rate = 50000  =>  nil

Returns:

  • (nil)


1042
1043
1044
1045
1046
# File 'ext/rbczmq/socket.c', line 1042

static VALUE rb_czmq_socket_set_opt_rate(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rate, "RATE", value);
}

#raw=(true) ⇒ nil

Define this as a RAW socket - applicable to ROUTER sockets only.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:ROUTER)
sock.router = 200  =>  nil

Returns:

  • (nil)


1604
1605
1606
1607
1608
# File 'ext/rbczmq/socket.c', line 1604

static VALUE rb_czmq_socket_set_opt_raw(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetBooleanSockOpt(obj, zsocket_set_router_raw, "ROUTER_RAW", value);
}

#rcvbufFixnum

Returns the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf  =>  0

Returns:

  • (Fixnum)


1219
1220
1221
1222
1223
1224
# File 'ext/rbczmq/socket.c', line 1219

static VALUE rb_czmq_socket_opt_rcvbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvbuf(sock->socket));
}

#rcvbuf=(1000) ⇒ nil

Sets the socket RCVBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvbuf = 1000  =>  nil

Returns:

  • (nil)


1239
1240
1241
1242
1243
# File 'ext/rbczmq/socket.c', line 1239

static VALUE rb_czmq_socket_set_opt_rcvbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvbuf, "RCVBUF", value);
}

#rcvhwmFixnum

Returns the socket receive HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm  =>  0

Returns:

  • (Fixnum)


942
943
944
945
946
947
# File 'ext/rbczmq/socket.c', line 942

static VALUE rb_czmq_socket_opt_rcvhwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvhwm(sock->socket));
}

#rcvhwm=(100) ⇒ nil

Sets the socket receive HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvhwm = 100  =>  nil
sock.rcvhwm  =>  100

Returns:

  • (nil)


963
964
965
966
967
# File 'ext/rbczmq/socket.c', line 963

static VALUE rb_czmq_socket_set_opt_rcvhwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvhwm, "RCVHWM", value);
}

#rcvmoreBoolean

Query if there’s more messages to receive.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.rcvmore =>  true

Returns:

  • (Boolean)


1485
1486
1487
1488
1489
1490
# File 'ext/rbczmq/socket.c', line 1485

static VALUE rb_czmq_socket_opt_rcvmore(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return (zsocket_rcvmore(sock->socket) == 1) ? Qtrue : Qfalse;
}

#rcvtimeoFixnum

Returns the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo  =>  -1

Returns:

  • (Fixnum)


1525
1526
1527
1528
1529
1530
# File 'ext/rbczmq/socket.c', line 1525

static VALUE rb_czmq_socket_opt_rcvtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_rcvtimeo(sock->socket));
}

#rcvtimeout=(200) ⇒ nil

Sets the socket RCVTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.rcvtimeo = 200  =>  nil

Returns:

  • (nil)


1545
1546
1547
1548
1549
# File 'ext/rbczmq/socket.c', line 1545

static VALUE rb_czmq_socket_set_opt_rcvtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_rcvtimeo, "RCVTIMEO", value);
}

#readable?Boolean

Determines if there are one or more messages to read from this socket. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.readable? => true

Returns:

  • (Boolean)


44
45
46
# File 'lib/zmq/socket.rb', line 44

def readable?
  (events & ZMQ::POLLIN) == ZMQ::POLLIN
end

#bind("inproc: //test") ⇒ Fixnum

Binds to a given endpoint. When the port number is ‘*’, attempts to bind to a free port. Always returns the port number on success.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.bind("tcp://localhost:*")    =>  5432

Returns:

  • (Fixnum)


224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'ext/rbczmq/socket.c', line 224

static VALUE rb_czmq_socket_bind(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_bind, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: bound \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_BOUND;
    rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
    return INT2NUM(rc);
}

#connect("tcp: //localhost:3456") ⇒ Boolean

Attempts to connect to a given endpoint.

Examples

ctx = ZMQ::Context.new
rep = ctx.socket(:REP)
port = rep.bind("tcp://localhost:*")    =>  5432
req = ctx.socket(:REQ)
req.connect("tcp://localhost:#{port}")   => true

Returns:

  • (Boolean)


258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'ext/rbczmq/socket.c', line 258

static VALUE rb_czmq_socket_connect(VALUE obj, VALUE endpoint)
{
    struct nogvl_conn_args args;
    int rc;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqSockGuardCrossThread(sock);
    Check_Type(endpoint, T_STRING);
    args.socket = sock;
    args.endpoint = StringValueCStr(endpoint);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_socket_connect, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: connected \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(endpoint));
    sock->state = ZMQ_SOCKET_CONNECTED;
    rb_ary_push(sock->endpoints, rb_str_new4(endpoint));
    return Qtrue;
}

#reconnect_ivlFixnum

Returns the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl  =>  100

Returns:

  • (Fixnum)


1336
1337
1338
1339
1340
1341
# File 'ext/rbczmq/socket.c', line 1336

static VALUE rb_czmq_socket_opt_reconnect_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl(sock->socket));
}

#reconnect_ivl=(200) ⇒ nil

Sets the socket RECONNECT_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl = 200  =>  nil

Returns:

  • (nil)


1356
1357
1358
1359
1360
# File 'ext/rbczmq/socket.c', line 1356

static VALUE rb_czmq_socket_set_opt_reconnect_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl, "RECONNECT_IVL", value);
}

#reconnect_ivl_maxFixnum

Returns the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max  =>  0

Returns:

  • (Fixnum)


1375
1376
1377
1378
1379
1380
# File 'ext/rbczmq/socket.c', line 1375

static VALUE rb_czmq_socket_opt_reconnect_ivl_max(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_reconnect_ivl_max(sock->socket));
}

#reconnect_ivl_max=(5) ⇒ nil

Sets the socket RECONNECT_IVL_MAX value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.reconnect_ivl_max = 5  =>  nil

Returns:

  • (nil)


1395
1396
1397
1398
1399
# File 'ext/rbczmq/socket.c', line 1395

static VALUE rb_czmq_socket_set_opt_reconnect_ivl_max(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_reconnect_ivl_max, "RECONNECT_IVL_MAX", value);
}

#recovery_ivlFixnum

Returns the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl  =>  10

Returns:

  • (Fixnum)


1061
1062
1063
1064
1065
1066
# File 'ext/rbczmq/socket.c', line 1061

static VALUE rb_czmq_socket_opt_recovery_ivl(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl(sock->socket));
}

#recovery_ivl=(20) ⇒ nil

Sets the socket RECOVERY_IVL value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl = 20  =>  nil

Returns:

  • (nil)


1081
1082
1083
1084
1085
# File 'ext/rbczmq/socket.c', line 1081

static VALUE rb_czmq_socket_set_opt_recovery_ivl(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl, "RECOVERY_IVL", value);
}

#recovery_ivl_msecFixnum

Returns the socket RECOVERY_IVL_MSEC value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl_msec  =>  -1

Returns:

  • (Fixnum)


1101
1102
1103
1104
1105
1106
# File 'ext/rbczmq/socket.c', line 1101

static VALUE rb_czmq_socket_opt_recovery_ivl_msec(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_recovery_ivl_msec(sock->socket));
}

#recovery_ivl_msec=(20) ⇒ nil

Sets the socket RECOVERY_IVL_MSEC value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.recovery_ivl_msec = 20  =>  nil

Returns:

  • (nil)


1121
1122
1123
1124
1125
# File 'ext/rbczmq/socket.c', line 1121

static VALUE rb_czmq_socket_set_opt_recovery_ivl_msec(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_recovery_ivl_msec, "RECOVERY_IVL_MSEC", value);
}

#recvString?

Receive a string from this ZMQ socket. May block depending on the socket type.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv    =>  "message"

Returns:

  • (String, nil)


461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'ext/rbczmq/socket.c', line 461

static VALUE rb_czmq_socket_recv(VALUE obj)
{
    char *str = NULL;
    struct nogvl_recv_args args;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    str = (char *)rb_thread_blocking_region(rb_czmq_nogvl_recv, (void *)&args, RUBY_UBF_IO, 0);
    if (str == NULL) return result;
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
    result = ZmqEncode(rb_str_new2(str));
    free(str);
    return result;
}

#recv_frameZMQ::Frame?

Receives a ZMQ frame from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame  =>  ZMQ::Frame or nil

Returns:



696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
# File 'ext/rbczmq/socket.c', line 696

static VALUE rb_czmq_socket_recv_frame(VALUE obj)
{
    zframe_t *frame = NULL;
    struct nogvl_recv_args args;
    char print_prefix[255];
    char *cur_time = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    frame = (zframe_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_frame, (void *)&args, RUBY_UBF_IO, 0);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_frame_nonblockZMQ::Frame?

Receives a ZMQ frame from this socket. Does not block

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_frame_nonblock  =>  ZMQ::Frame or nil

Returns:



730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
# File 'ext/rbczmq/socket.c', line 730

static VALUE rb_czmq_socket_recv_frame_nonblock(VALUE obj)
{
    zframe_t *frame = NULL;
    char print_prefix[255];
    char *cur_time = NULL;
    errno = 0;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    frame = zframe_recv_nowait(sock->socket);
    if (frame == NULL) return Qnil;
    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        ZmqDumpFrame("recv_frame_nonblock", frame);
    }
    return rb_czmq_alloc_frame(frame);
}

#recv_messageZMQ::Message?

Receives a ZMQ message from this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_message  =>  ZMQ::Message or nil

Returns:



791
792
793
794
795
796
797
798
799
800
801
802
803
804
# File 'ext/rbczmq/socket.c', line 791

static VALUE rb_czmq_socket_recv_message(VALUE obj)
{
    zmsg_t *message = NULL;
    struct nogvl_recv_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    args.socket = sock;
    message = (zmsg_t *)rb_thread_blocking_region(rb_czmq_nogvl_recv_message, (void *)&args, RUBY_UBF_IO, 0);
    if (message == NULL) return Qnil;
    if (sock->verbose) ZmqDumpMessage("recv_message", message);
    return rb_czmq_alloc_message(message);
}

#recv_nonblockString?

Receive a string from this ZMQ socket. Does not block.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
sock.recv_nonblock    =>  "message"

Returns:

  • (String, nil)


495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# File 'ext/rbczmq/socket.c', line 495

static VALUE rb_czmq_socket_recv_nonblock(VALUE obj)
{
    char *str = NULL;
    errno = 0;
    VALUE result = Qnil;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only receive on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    str = zstr_recv_nowait(sock->socket);
    if (str == NULL) return result;
    ZmqAssertSysError();
    if (sock->verbose)
        zclock_log ("I: %s socket %p: recv_nonblock \"%s\"", zsocket_type_str(sock->socket), sock->socket, str);
    result = ZmqEncode(rb_str_new2(str));
    free(str);
    return result;
}

#send("message") ⇒ Boolean

Sends a string to this ZMQ socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.send("message")    =>  true

Returns:

  • (Boolean)


366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'ext/rbczmq/socket.c', line 366

static VALUE rb_czmq_socket_send(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    Check_Type(msg, T_STRING);
    args.socket = sock;
    args.msg = StringValueCStr(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_send, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: send \"%s\"", zsocket_type_str(sock->socket), obj, StringValueCStr(msg));
    return Qtrue;
}

#send_frame(frame) ⇒ nil

Sends a ZMQ::Frame instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
frame = ZMQ::Frame("frame")
sock.send_frame(frame)    =>  nil
frame = ZMQ::Frame("multi")
sock.send_frame(frame, ZMQ::Frame::MORE)

Returns:

  • (nil)


555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
# File 'ext/rbczmq/socket.c', line 555

static VALUE rb_czmq_socket_send_frame(int argc, VALUE *argv, VALUE obj)
{
    struct nogvl_send_frame_args args;
    VALUE frame_obj;
    VALUE flags;
    char print_prefix[255];
    char *cur_time = NULL;
    zframe_t *print_frame = NULL;
    int rc, flgs;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    rb_scan_args(argc, argv, "11", &frame_obj, &flags);
    ZmqGetFrame(frame_obj);

    if (NIL_P(flags)) {
        flgs = 0;
    } else {
        if (SYMBOL_P(flags)) flags = rb_const_get_at(rb_cZmqFrame, rb_to_id(flags));
        Check_Type(flags, T_FIXNUM);
        flgs = FIX2INT(flags);
    }

    if (sock->verbose) {
        cur_time = rb_czmq_formatted_current_time();
        print_frame = (flgs & ZFRAME_REUSE) ? frame : zframe_dup(frame);
    }
    args.socket = sock;
    args.frame = frame;
    args.flags = flgs;
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_send_frame, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose) ZmqDumpFrame("send_frame", print_frame);
    return Qtrue;
}

#send_message(msg) ⇒ nil

Sends a ZMQ::Message instance to this socket.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.bind("inproc://test")
msg = ZMQ::Message.new
msg.push ZMQ::Frame("header")
sock.send_message(msg)   =>  nil

Returns:

  • (nil)


636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
# File 'ext/rbczmq/socket.c', line 636

static VALUE rb_czmq_socket_send_message(VALUE obj, VALUE message_obj)
{
    struct nogvl_send_message_args args;
    zmsg_t *print_message = NULL;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    ZmqGetMessage(message_obj);
    if (sock->verbose) print_message = zmsg_dup(message->message);
    args.socket = sock;
    args.message = message->message;
    rb_thread_blocking_region(rb_czmq_nogvl_send_message, (void *)&args, RUBY_UBF_IO, 0);
    message->flags |= ZMQ_MESSAGE_DESTROYED;
    if (sock->verbose) ZmqDumpMessage("send_message", print_message);
    return Qnil;
}

#sendm("message") ⇒ Boolean

Sends a string to this ZMQ socket, with a more flag set.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REQ)
sock.connect("inproc://test")
sock.sendm("mes")    =>  true
sock.sendm("sage")    =>  true

Returns:

  • (Boolean)


399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'ext/rbczmq/socket.c', line 399

static VALUE rb_czmq_socket_sendm(VALUE obj, VALUE msg)
{
    int rc;
    struct nogvl_send_args args;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    ZmqAssertSocketNotPending(sock, "can only send on a bound or connected socket!");
    ZmqSockGuardCrossThread(sock);
    Check_Type(msg, T_STRING);
    args.socket = sock;
    args.msg = StringValueCStr(msg);
    rc = (int)rb_thread_blocking_region(rb_czmq_nogvl_zstr_sendm, (void *)&args, RUBY_UBF_IO, 0);
    ZmqAssert(rc);
    if (sock->verbose)
        zclock_log ("I: %s socket %p: sendm \"%s\"", zsocket_type_str(sock->socket), sock->socket, StringValueCStr(msg));
    return Qtrue;
}

#sndbufFixnum

Returns the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf  =>  0

Returns:

  • (Fixnum)


1180
1181
1182
1183
1184
1185
# File 'ext/rbczmq/socket.c', line 1180

static VALUE rb_czmq_socket_opt_sndbuf(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndbuf(sock->socket));
}

#sndbuf=(1000) ⇒ nil

Sets the socket SNDBUF value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndbuf = 1000  =>  nil

Returns:

  • (nil)


1200
1201
1202
1203
1204
# File 'ext/rbczmq/socket.c', line 1200

static VALUE rb_czmq_socket_set_opt_sndbuf(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndbuf, "SNDBUF", value);
}

#sndhwmFixnum

Returns the socket send HWM (High Water Mark) value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm  =>  0

Returns:

  • (Fixnum)


902
903
904
905
906
907
# File 'ext/rbczmq/socket.c', line 902

static VALUE rb_czmq_socket_opt_sndhwm(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndhwm(sock->socket));
}

#sndhwm=(100) ⇒ nil

Sets the socket send HWM (High Water Mark() value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndhwm = 100  =>  nil
sock.sndhwm  =>  100

Returns:

  • (nil)


923
924
925
926
927
# File 'ext/rbczmq/socket.c', line 923

static VALUE rb_czmq_socket_set_opt_sndhwm(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndhwm, "SNDHWM", value);
}

#sndtimeoFixnum

Returns the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo  =>  -1

Returns:

  • (Fixnum)


1564
1565
1566
1567
1568
1569
# File 'ext/rbczmq/socket.c', line 1564

static VALUE rb_czmq_socket_opt_sndtimeo(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_sndtimeo(sock->socket));
}

#sndtimeout=(200) ⇒ nil

Sets the socket SNDTIMEO value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.sndtimeo = 200  =>  nil

Returns:

  • (nil)


1584
1585
1586
1587
1588
# File 'ext/rbczmq/socket.c', line 1584

static VALUE rb_czmq_socket_set_opt_sndtimeo(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_sndtimeo, "SNDTIMEO", value);
}

#stateString

Returns the current socket state, one of ZMQ::Socket::PENDING, ZMQ::Socket::BOUND or ZMQ::Socket::CONNECTED

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:PUSH)
sock.state       =>   ZMQ::Socket::PENDING
sock.bind("inproc://test")
sock.state       =>   ZMQ::Socket::BOUND

Returns:

  • (String)


149
150
151
152
153
154
# File 'ext/rbczmq/socket.c', line 149

static VALUE rb_czmq_socket_state(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(sock->state);
}

#subscribe(value) ⇒ Object

Subscribes this SUB socket to a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.subscribe "ruby"  =>  nil


1443
1444
1445
1446
1447
1448
1449
# File 'ext/rbczmq/socket.c', line 1443

static VALUE rb_czmq_socket_set_opt_subscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_subscribe, "SUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#swapFixnum

Returns the socket SWAP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.swap  =>  0

Returns:

  • (Fixnum)


861
862
863
864
865
866
# File 'ext/rbczmq/socket.c', line 861

static VALUE rb_czmq_socket_opt_swap(VALUE obj)
{
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    return INT2NUM(zsocket_swap(sock->socket));
}

#swap=(100) ⇒ nil

Sets the socket SWAP value.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.swap = 100  =>  nil

Returns:

  • (nil)


881
882
883
884
885
# File 'ext/rbczmq/socket.c', line 881

static VALUE rb_czmq_socket_set_opt_swap(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetSockOpt(obj, zsocket_set_swap, "SWAP", value);
}

#to_sObject

Generates a string representation of the current socket state

socket = ctx.bind(:PUB, “tcp://127.0.0.1:5000”) socket.to_s => “PUB socket bound to tcp://127.0.0.1:5000”



71
72
73
74
75
76
77
78
79
80
# File 'lib/zmq/socket.rb', line 71

def to_s
  case state
  when BOUND
    "#{type_str} socket bound to #{endpoints.join(', ')}"
  when CONNECTED
    "#{type_str} socket connected to #{endpoints.join(', ')}"
  else
    "#{type_str} socket"
  end
end

#type_strObject

Generates a string representation of this socket type

socket = ctx.socket(:PUB) socket.type_str => “PUB”



62
63
64
# File 'lib/zmq/socket.rb', line 62

def type_str
  self.class.const_get(:TYPE_STR)
end

#unsubscribe(value) ⇒ Object

Unsubscribes this SUB socket from a topic.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:SUB)
sock.unsubscribe "ruby"  =>  nil


1464
1465
1466
1467
1468
1469
1470
# File 'ext/rbczmq/socket.c', line 1464

static VALUE rb_czmq_socket_set_opt_unsubscribe(VALUE obj, VALUE value)
{
    zmq_sock_wrapper *sock = NULL;
    ZmqSetStringSockOpt(obj, zsocket_set_unsubscribe, "UNSUBSCRIBE", value, {
       ZmqAssertSockOptFor(ZMQ_SUB)
    });
}

#verbose=(true) ⇒ nil

Let this socket be verbose - dumps a lot of data to stdout for debugging.

Examples

ctx = ZMQ::Context.new
sock = ctx.socket(:REP)
sock.verbose = true    =>  nil

Returns:

  • (nil)


290
291
292
293
294
295
296
297
298
# File 'ext/rbczmq/socket.c', line 290

static VALUE rb_czmq_socket_set_verbose(VALUE obj, VALUE level)
{
    bool vlevel;
    zmq_sock_wrapper *sock = NULL;
    GetZmqSocket(obj);
    vlevel = (level == Qtrue) ? true : false;
    sock->verbose = vlevel;
    return Qnil;
}

#writable?Boolean

Determines if this socket is in a writable state. Should be used in conjunction with the ZMQ_FD socket option for edge-triggered notifications.

socket.writable? => true

Returns:

  • (Boolean)


53
54
55
# File 'lib/zmq/socket.rb', line 53

def writable?
  (events & ZMQ::POLLOUT) == ZMQ::POLLOUT
end