Class: ZMQ::Socket

Inherits:
Object
  • Object
show all
Includes:
CommonSocketBehavior, IdentitySupport
Defined in:
lib/ffi-rzmq/socket.rb,
lib/ffi-rzmq/socket.rb

Instance Attribute Summary

Attributes included from CommonSocketBehavior

#name, #socket

Instance Method Summary collapse

Methods included from IdentitySupport

#identity, #identity=

Methods included from CommonSocketBehavior

#bind, #close, #connect, create, #initialize, #more_parts?, #setsockopt

Methods included from Util

errno, error_string, nonblocking_flag, resultcode_ok?, version

Instance Method Details

#getsockopt(name, array) ⇒ Object

Get the options set on this socket.

name determines the socket option to request array should be an empty array; a result of the proper type (numeric, string, boolean) will be inserted into the first position.

Valid option_name values:

ZMQ::RCVMORE - true or false
ZMQ::HWM - integer
ZMQ::SWAP - integer
ZMQ::AFFINITY - bitmap in an integer
ZMQ::IDENTITY - string
ZMQ::RATE - integer
ZMQ::RECOVERY_IVL - integer
ZMQ::SNDBUF - integer
ZMQ::RCVBUF - integer
ZMQ::FD     - fd in an integer
ZMQ::EVENTS - bitmap integer
ZMQ::LINGER - integer measured in milliseconds
ZMQ::RECONNECT_IVL - integer measured in milliseconds
ZMQ::BACKLOG - integer
ZMQ::RECOVER_IVL_MSEC - integer measured in milliseconds

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

# retrieve high water mark
array = []
rc = socket.getsockopt(ZMQ::HWM, array)
hwm = array.first if ZMQ::Util.resultcode_ok?(rc)


387
388
389
390
391
392
393
394
395
396
# File 'lib/ffi-rzmq/socket.rb', line 387

def getsockopt name, array
  rc = __getsockopt__ name, array
  
  if Util.resultcode_ok?(rc) && (RCVMORE == name || MCAST_LOOP == name)
    # convert to boolean
    array[0] = 1 == array[0]
  end
    
  rc
end

#label?Boolean

The last message part received is tested to see if it is a label.

Equivalent to calling Socket#getsockopt with ZMQ::RCVLABEL.

Warning: if the call to #getsockopt fails, this method will return false and swallow the error.

labels = []
message_parts = []
message = Message.new
rc = socket.recv(message)
if ZMQ::Util.resultcode_ok?(rc)
  label? ? labels.push(message) : message_parts.push(message)
  while more_parts?
    message = Message.new
    if ZMQ::Util.resulcode_ok?(socket.recv(message))
      label? ? labels.push(message) : message_parts.push(message)
    end
  end
end

Returns:

  • (Boolean)


737
738
739
740
741
742
# File 'lib/ffi-rzmq/socket.rb', line 737

def label?
  array = []
  rc = getsockopt ZMQ::RCVLABEL, array
  
  Util.resultcode_ok?(rc) ? array.at(0) : false
end

#recv(message, flags = 0) ⇒ Object

Dequeues a message from the underlying queue. By default, this is a blocking operation.

flags may take two values:

0 (default) - blocking operation
ZMQ::NOBLOCK - non-blocking operation

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::NOBLOCK and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



524
525
526
# File 'lib/ffi-rzmq/socket.rb', line 524

def recv message, flags = 0
  LibZMQ.zmq_recv @socket, message.address, flags
end

#recv_string(string, flags = 0) ⇒ Object

Helper method to make a new #Message instance and convert its payload to a string.

flags may be ZMQ::DONTWAIT.

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



542
543
544
545
546
547
548
# File 'lib/ffi-rzmq/socket.rb', line 542

def recv_string string, flags = 0
  message = @receiver_klass.new
  rc = recv message, flags
  string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
  message.close
  rc
end

#recv_strings(list, flag = 0) ⇒ Object

Receive a multipart message as a list of strings.

flag may be ZMQ::DONTWAIT. Any other flag will be removed.



564
565
566
567
568
569
570
571
572
573
574
575
576
# File 'lib/ffi-rzmq/socket.rb', line 564

def recv_strings list, flag = 0
  array = []
  rc = recvmsgs array, flag
  
  if Util.resultcode_ok?(rc)
    array.each do |message|
      list << message.copy_out_string
      message.close
    end
  end
  
  rc
end

#recvmsg(message, flags = 0) ⇒ Object

Dequeues a message from the underlying queue. By default, this is a blocking operation.

flags may take two values:

0 (default) - blocking operation
ZMQ::DONTWAIT - non-blocking operation

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



868
869
870
# File 'lib/ffi-rzmq/socket.rb', line 868

def recvmsg message, flags = 0
  LibZMQ.zmq_recvmsg @socket, message.address, flags
end

#recvmsgs(list, flag = 0) ⇒ Object

Receive a multipart message as an array of objects (by default these are instances of Message).

flag may be ZMQ::DONTWAIT. Any other flag will be removed.

Raises the same exceptions as Socket#recv.



594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
# File 'lib/ffi-rzmq/socket.rb', line 594

def recvmsgs list, flag = 0
  flag = NOBLOCK if noblock?(flag)

  parts = []
  message = @receiver_klass.new
  rc = recv message, flag
  parts << message

  # check rc *first*; necessary because the call to #more_parts? can reset
  # the zmq_errno to a weird value, so the zmq_errno that was set on the
  # call to #recv gets lost
  while Util.resultcode_ok?(rc) && more_parts?
    message = @receiver_klass.new
    rc = recv message, flag
    parts << message
  end

  # only append the received parts if there were no errors
  # FIXME:
  # need to detect EAGAIN if flag is set; EAGAIN means we have read all that we
  # can and should return whatever was already read; need a spec!
  if Util.resultcode_ok?(rc)
    parts.each { |part| list << part }
  end

  rc
end

#send(message, flags = 0) ⇒ Object

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

flags may take two values:

  • 0 (default) - blocking operation

  • ZMQ::NOBLOCK - non-blocking operation

  • ZMQ::SNDMORE - this message is part of a multi-part message

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::NOBLOCK and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #send returns. Regardless of the return code, the user is responsible for calling message.close to free the memory in use.



418
419
420
# File 'lib/ffi-rzmq/socket.rb', line 418

def send message, flags = 0
  LibZMQ.zmq_send @socket, message.address, flags
end

#send_and_close(message, flags = 0) ⇒ Object

Sends a message. This will automatically close the message for both successful and failed sends.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



501
502
503
504
505
# File 'lib/ffi-rzmq/socket.rb', line 501

def send_and_close message, flags = 0
  rc = send message, flags
  message.close
  rc
end

#send_string(string, flags = 0) ⇒ Object

Helper method to make a new #Message instance out of the string passed in for transmission.

flags may be ZMQ::DONTWAIT and ZMQ::SNDMORE.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



435
436
437
438
# File 'lib/ffi-rzmq/socket.rb', line 435

def send_string message_string, flags = 0
  message = Message.new message_string
  send_and_close message, flags
end

#send_strings(parts, flags = 0) ⇒ Object

Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.

flags may be ZMQ::DONTWAIT.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



454
455
456
457
458
459
460
461
462
463
# File 'lib/ffi-rzmq/socket.rb', line 454

def send_strings parts, flags = 0
  return -1 if !parts || parts.empty?

  parts[0..-2].each do |part|
    rc = send_string part, flags | ZMQ::SNDMORE
    return rc unless Util.resultcode_ok?(rc)
  end

  send_string parts[-1], flags
end

#sendmsg(message, flags = 0) ⇒ Object

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

flags may take two values:

  • 0 (default) - blocking operation

  • ZMQ::DONTWAIT - non-blocking operation

  • ZMQ::SNDMORE - this message is part of a multi-part message

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



760
761
762
# File 'lib/ffi-rzmq/socket.rb', line 760

def sendmsg message, flags = 0
  LibZMQ.zmq_sendmsg @socket, message.address, flags
end

#sendmsgs(parts, flags = 0) ⇒ Object

Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).

flags may be ZMQ::DONTWAIT.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with ZMQ::DONTWAIT and the socket returned EAGAIN.

With a -1 return code, the user must check ZMQ.errno to determine the cause.



479
480
481
482
483
484
485
486
487
488
# File 'lib/ffi-rzmq/socket.rb', line 479

def sendmsgs parts, flags = 0
  return -1 if !parts || parts.empty?

  parts[0..-2].each do |part|
    rc = send part, flags | ZMQ::SNDMORE
    return rc unless Util.resultcode_ok?(rc)
  end

  send parts[-1], flags
end