Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Defined in:
lib/ffi-rzmq/socket.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Socket

To avoid rescuing exceptions, use the factory method #create for all socket creation.

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the 0mq socket within the context fails.

begin
  socket = Socket.new(context.pointer, ZMQ::REQ)
rescue ContextError => e
  # error handling
end


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/ffi-rzmq/socket.rb', line 65

def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  # users may override the classes used for receiving; class must conform to the
  # same public API as ZMQ::Message
  @receiver_klass = opts[:receiver_class]

  context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)

  if context_ptr.nil? || context_ptr.null?
    raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null"
  else
    @socket = LibZMQ.zmq_socket context_ptr, type
    if @socket && !@socket.null?
      @name = SocketTypeNameMap[type]
    else
      raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null"
    end
  end

  @longlong_cache = @int_cache = nil
  @more_parts_array = []
  @option_lookup = []
  populate_option_lookup

  define_finalizer
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/ffi-rzmq/socket.rb', line 5

def name
  @name
end

#socketObject (readonly)

Returns the value of attribute socket.



5
6
7
# File 'lib/ffi-rzmq/socket.rb', line 5

def socket
  @socket
end

Class Method Details

.create(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can return nil when socket creation fails.

if (socket = Socket.new(context.pointer, ZMQ::REQ))
  ...
else
  STDERR.puts "Socket creation failed"
end


32
33
34
# File 'lib/ffi-rzmq/socket.rb', line 32

def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  new(context_ptr, type, opts) rescue nil
end

Instance Method Details

#bind(address) ⇒ Object

Binds the socket to an address.

socket.bind("tcp://127.0.0.1:5555")


178
179
180
# File 'lib/ffi-rzmq/socket.rb', line 178

def bind address
  LibZMQ.zmq_bind @socket, address
end

#closeObject

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.

Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check ZMQ::Util.errno for the error code.

rc = socket.close
puts("Given socket was invalid!") unless 0 == rc


199
200
201
202
203
204
205
206
207
208
209
# File 'lib/ffi-rzmq/socket.rb', line 199

def close
  if @socket
    remove_finalizer
    rc = LibZMQ.zmq_close @socket
    @socket = nil
    release_cache
    rc
  else
    0
  end
end

#connect(address) ⇒ Object

Connects the socket to an address.

rc = socket.connect("tcp://127.0.0.1:5555")


186
187
188
# File 'lib/ffi-rzmq/socket.rb', line 186

def connect address
  rc = LibZMQ.zmq_connect @socket, address
end

#disconnect(endpoint) ⇒ Object

Disconnect the socket from the given endpoint.



490
491
492
# File 'lib/ffi-rzmq/socket.rb', line 490

def disconnect(endpoint)
  LibZMQ.zmq_disconnect(socket, endpoint)
end

#getsockopt(name, array) ⇒ Object

Get the options set on this socket.

name determines the socket option to request array should be an empty array; a result of the proper type (numeric, string, boolean) will be inserted into the first position.

Valid option_name values:

ZMQ::RCVMORE - true or false
ZMQ::HWM - integer
ZMQ::SWAP - integer
ZMQ::AFFINITY - bitmap in an integer
ZMQ::IDENTITY - string
ZMQ::RATE - integer
ZMQ::RECOVERY_IVL - integer
ZMQ::SNDBUF - integer
ZMQ::RCVBUF - integer
ZMQ::FD     - fd in an integer
ZMQ::EVENTS - bitmap integer
ZMQ::LINGER - integer measured in milliseconds
ZMQ::RECONNECT_IVL - integer measured in milliseconds
ZMQ::BACKLOG - integer
ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds
ZMQ::IPV4ONLY - integer

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

# retrieve high water mark
array = []
rc = socket.getsockopt(ZMQ::HWM, array)
hwm = array.first if ZMQ::Util.resultcode_ok?(rc)


463
464
465
466
467
468
469
470
471
472
# File 'lib/ffi-rzmq/socket.rb', line 463

def getsockopt name, array
  rc = __getsockopt__ name, array

  if Util.resultcode_ok?(rc) && (RCVMORE == name)
    # convert to boolean
    array[0] = 1 == array[0]
  end

  rc
end

#identityObject

Convenience method for getting the value of the socket IDENTITY.



476
477
478
479
480
# File 'lib/ffi-rzmq/socket.rb', line 476

def identity
  array = []
  getsockopt IDENTITY, array
  array.at(0)
end

#identity=(value) ⇒ Object

Convenience method for setting the value of the socket IDENTITY.



484
485
486
# File 'lib/ffi-rzmq/socket.rb', line 484

def identity=(value)
  setsockopt IDENTITY, value.to_s
end

#more_parts?Boolean

Convenience method for checking on additional message parts.

Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.

Warning: if the call to #getsockopt fails, this method will return false and swallow the error.

message_parts = []
message = Message.new
rc = socket.recvmsg(message)
if ZMQ::Util.resultcode_ok?(rc)
  message_parts << message
  while more_parts?
    message = Message.new
    rc = socket.recvmsg(message)
    message_parts.push(message) if resulcode_ok?(rc)
  end
end

Returns:

  • (Boolean)


168
169
170
171
172
# File 'lib/ffi-rzmq/socket.rb', line 168

def more_parts?
  rc = getsockopt ZMQ::RCVMORE, @more_parts_array

  Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
end

#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object

Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.



408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/ffi-rzmq/socket.rb', line 408

def recv_multipart list, routing_envelope, flag = 0
  parts = []
  rc = recvmsgs parts, flag

  if Util.resultcode_ok?(rc)
    routing = true
    parts.each do |part|
      if routing
        routing_envelope << part
        routing = part.size > 0
      else
        list << part
      end
    end
  end

  rc
end

#recv_string(string, flags = 0) ⇒ Object

Helper method to make a new #Message instance and convert its payload to a string.

flags may be ZMQ::DONTWAIT.

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



340
341
342
343
344
345
346
# File 'lib/ffi-rzmq/socket.rb', line 340

def recv_string string, flags = 0
  message = @receiver_klass.new
  rc = recvmsg message, flags
  string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
  message.close
  rc
end

#recv_strings(list, flag = 0) ⇒ Object

Receive a multipart message as a list of strings.

flag may be ZMQ::DONTWAIT. Any other flag will be removed.



353
354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/ffi-rzmq/socket.rb', line 353

def recv_strings list, flag = 0
  array = []
  rc = recvmsgs array, flag

  if Util.resultcode_ok?(rc)
    array.each do |message|
      list << message.copy_out_string
      message.close
    end
  end

  rc
end

#recvmsg(message, flags = 0) ⇒ Object

Dequeues a message from the underlying queue. By default, this is a blocking operation.

flags may take two values:

0 (default) - blocking operation
ZMQ::DONTWAIT - non-blocking operation

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



319
320
321
322
# File 'lib/ffi-rzmq/socket.rb', line 319

def recvmsg message, flags = 0
  #LibZMQ.zmq_recvmsg @socket, message.address, flags
  __recvmsg__(@socket, message.address, flags)
end

#recvmsgs(list, flag = 0) ⇒ Object

Receive a multipart message as an array of objects (by default these are instances of Message).

flag may be ZMQ::DONTWAIT. Any other flag will be removed.



373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
# File 'lib/ffi-rzmq/socket.rb', line 373

def recvmsgs list, flag = 0
  flag = DONTWAIT if dontwait?(flag)

  message = @receiver_klass.new
  rc = recvmsg message, flag

  if Util.resultcode_ok?(rc)
    list << message

    # check rc *first*; necessary because the call to #more_parts? can reset
    # the zmq_errno to a weird value, so the zmq_errno that was set on the
    # call to #recv gets lost
    while Util.resultcode_ok?(rc) && more_parts?
      message = @receiver_klass.new
      rc = recvmsg message, flag

      if Util.resultcode_ok?(rc)
        list << message
      else
        message.close
        list.each { |msg| msg.close }
        list.clear
      end
    end
  else
    message.close
  end

  rc
end

#send_and_close(message, flags = 0) ⇒ Object

Sends a message. This will automatically close the message for both successful and failed sends.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.



296
297
298
299
300
# File 'lib/ffi-rzmq/socket.rb', line 296

def send_and_close message, flags = 0
  rc = sendmsg message, flags
  message.close
  rc
end

#send_string(string, flags = 0) ⇒ Object

Helper method to make a new #Message instance out of the string passed in for transmission.

flags may be ZMQ::DONTWAIT and ZMQ::SNDMORE.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.



244
245
246
247
# File 'lib/ffi-rzmq/socket.rb', line 244

def send_string string, flags = 0
  message = Message.new string
  send_and_close message, flags
end

#send_strings(parts, flags = 0) ⇒ Object

Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.

flags may be ZMQ::DONTWAIT.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.



263
264
265
# File 'lib/ffi-rzmq/socket.rb', line 263

def send_strings parts, flags = 0
  send_multiple(parts, flags, :send_string)
end

#sendmsg(message, flags = 0) ⇒ Object

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

flags may take two values:

  • 0 (default) - blocking operation

  • ZMQ::DONTWAIT - non-blocking operation

  • ZMQ::SNDMORE - this message is part of a multi-part message

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.



227
228
229
# File 'lib/ffi-rzmq/socket.rb', line 227

def sendmsg message, flags = 0
  __sendmsg__(@socket, message.address, flags)
end

#sendmsgs(parts, flags = 0) ⇒ Object

Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).

flags may be ZMQ::DONTWAIT.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.



281
282
283
# File 'lib/ffi-rzmq/socket.rb', line 281

def sendmsgs parts, flags = 0
  send_multiple(parts, flags, :sendmsg)
end

#setsockopt(name, value, length = nil) ⇒ Object

Set the queue options on this socket.

Valid name values that take a numeric value are:

ZMQ::HWM
ZMQ::SWAP (version 2 only)
ZMQ::AFFINITY
ZMQ::RATE
ZMQ::RECOVERY_IVL
ZMQ::MCAST_LOOP (version 2 only)
ZMQ::LINGER
ZMQ::RECONNECT_IVL
ZMQ::BACKLOG
ZMQ::RECOVER_IVL_MSEC (version 2 only)
ZMQ::RECONNECT_IVL_MAX (version 3 only)
ZMQ::MAXMSGSIZE (version 3 only)
ZMQ::SNDHWM (version 3 only)
ZMQ::RCVHWM (version 3 only)
ZMQ::MULTICAST_HOPS (version 3 only)
ZMQ::RCVTIMEO (version 3 only)
ZMQ::SNDTIMEO (version 3 only)

Valid name values that take a string value are:

ZMQ::IDENTITY (version 2/3 only)
ZMQ::SUBSCRIBE
ZMQ::UNSUBSCRIBE

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check ZMQ::Util.errno to determine the cause.

rc = socket.setsockopt(ZMQ::LINGER, 1_000)
ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/ffi-rzmq/socket.rb', line 126

def setsockopt name, value, length = nil
  if 1 == @option_lookup[name]
    length = 8
    pointer = LibC.malloc length
    pointer.write_long_long value

  elsif 0 == @option_lookup[name]
    length = 4
    pointer = LibC.malloc length
    pointer.write_int value

  elsif 2 == @option_lookup[name]
    # Strings are treated as pointers by FFI so we'll just pass it through
    length ||= value.size
    pointer = value

  end

  rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length
  LibC.free(pointer) unless pointer.is_a?(String) || pointer.nil? || pointer.null?
  rc
end

#unbind(endpoint) ⇒ Object

Unbind the socket from the given endpoint.



496
497
498
# File 'lib/ffi-rzmq/socket.rb', line 496

def unbind(endpoint)
  LibZMQ.zmq_unbind(socket, endpoint)
end