Module: ZMQ::CommonSocketBehavior

Includes:
Util
Included in:
Socket
Defined in:
lib/ffi-rzmq/socket.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

errno, error_string, nonblocking_flag, resultcode_ok?, version

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/ffi-rzmq/socket.rb', line 7

def name
  @name
end

#socketObject (readonly)

Returns the value of attribute socket.



7
8
9
# File 'lib/ffi-rzmq/socket.rb', line 7

def socket
  @socket
end

Class Method Details

.create(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can return nil when socket creation fails.

if (socket = Socket.new(context.pointer, ZMQ::REQ))
  ...
else
  STDERR.puts "Socket creation failed"
end


34
35
36
# File 'lib/ffi-rzmq/socket.rb', line 34

def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  new(context_ptr, type, opts) rescue nil
end

Instance Method Details

#bind(address) ⇒ Object

Binds the socket to an address.

socket.bind("tcp://127.0.0.1:5555")


180
181
182
# File 'lib/ffi-rzmq/socket.rb', line 180

def bind address
  LibZMQ.zmq_bind @socket, address
end

#closeObject

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.

Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check ZMQ.errno for the error code.

rc = socket.close
puts("Given socket was invalid!") unless 0 == rc


201
202
203
204
205
206
207
208
209
210
211
# File 'lib/ffi-rzmq/socket.rb', line 201

def close
  if @socket
    remove_finalizer
    rc = LibZMQ.zmq_close @socket
    @socket = nil
    release_cache
    rc
  else
    0
  end
end

#connect(address) ⇒ Object

Connects the socket to an address.

socket.connect("tcp://127.0.0.1:5555")


188
189
190
# File 'lib/ffi-rzmq/socket.rb', line 188

def connect address
  rc = LibZMQ.zmq_connect @socket, address
end

#initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object

To avoid rescuing exceptions, use the factory method #create for all socket creation.

Allocates a socket of type type for sending and receiving data.

type can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.

By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.

sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.

Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the 0mq socket within the context fails.

begin
  socket = Socket.new(context.pointer, ZMQ::REQ)
rescue ContextError => e
  # error handling
end


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/ffi-rzmq/socket.rb', line 67

def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message}
  # users may override the classes used for receiving; class must conform to the
  # same public API as ZMQ::Message
  @receiver_klass = opts[:receiver_class]
  
  context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context)

  unless context_ptr.null?
    @socket = LibZMQ.zmq_socket context_ptr, type
    if @socket && !@socket.null?
      @name = SocketTypeNameMap[type]
    else
      raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null"
    end
  else
    raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null"
  end

  @sockopt_cache = {}

  define_finalizer
end

#more_parts?Boolean

Convenience method for checking on additional message parts.

Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.

Warning: if the call to #getsockopt fails, this method will return false and swallow the error.

message_parts = []
message = Message.new
rc = socket.recv(message)
if ZMQ::Util.resultcode_ok?(rc)
  message_parts << message
  while more_parts?
    message = Message.new
    rc = socket.recv(message)
    message_parts.push(message) if resulcode_ok?(rc)
  end
end

Returns:

  • (Boolean)


169
170
171
172
173
174
# File 'lib/ffi-rzmq/socket.rb', line 169

def more_parts?
  array = []
  rc = getsockopt ZMQ::RCVMORE, array
  
  Util.resultcode_ok?(rc) ? array.at(0) : false
end

#setsockopt(name, value, length = nil) ⇒ Object

Set the queue options on this socket.

Valid name values that take a numeric value are:

ZMQ::HWM
ZMQ::SWAP (version 2 only)
ZMQ::AFFINITY
ZMQ::RATE
ZMQ::RECOVERY_IVL
ZMQ::MCAST_LOOP (version 2 only)
ZMQ::LINGER
ZMQ::RECONNECT_IVL
ZMQ::BACKLOG
ZMQ::RECOVER_IVL_MSEC (version 2 only)
ZMQ::RECONNECT_IVL_MAX (version 3/4 only)
ZMQ::MAXMSGSIZE (version 3/4 only)
ZMQ::SNDHWM (version 3/4 only)
ZMQ::RCVHWM (version 3/4 only)
ZMQ::MULTICAST_HOPS (version 3/4 only)
ZMQ::RCVTIMEO (version 3/4 only)
ZMQ::SNDTIMEO (version 3/4 only)
ZMQ::RCVLABEL (version 3/4 only)

Valid name values that take a string value are:

ZMQ::IDENTITY (version 2/3 only)
ZMQ::SUBSCRIBE
ZMQ::UNSUBSCRIBE

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check ZMQ.errno to determine the cause.

rc = socket.setsockopt(ZMQ::LINGER, 1_000)
ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")


126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/ffi-rzmq/socket.rb', line 126

def setsockopt name, value, length = nil
  if long_long_option?(name)
    length = 8
    pointer = LibC.malloc length
    pointer.write_long_long value

  elsif int_option?(name)
    length = 4
    pointer = LibC.malloc length
    pointer.write_int value

  elsif string_option?(name)
    length ||= value.size

    # note: not checking errno for failed memory allocations :(
    pointer = LibC.malloc length
    pointer.write_string value
  end

  rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length
  LibC.free(pointer) unless pointer.nil? || pointer.null?
  rc
end