Module: ZMQ::CommonSocketBehavior
- Included in:
- Socket
- Defined in:
- lib/ffi-rzmq/socket.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
Class Method Summary collapse
-
.create(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object
Allocates a socket of type
type
for sending and receiving data.
Instance Method Summary collapse
-
#bind(address) ⇒ Object
Binds the socket to an
address
. -
#close ⇒ Object
Closes the socket.
-
#connect(address) ⇒ Object
Connects the socket to an
address
. -
#initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object
To avoid rescuing exceptions, use the factory method #create for all socket creation.
-
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
-
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets.
-
#recv_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
-
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
-
#recvmsg(message, flags = 0) ⇒ Object
Dequeues a message from the underlying queue.
-
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
-
#send_and_close(message, flags = 0) ⇒ Object
Sends a message.
-
#send_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance out of the
string
passed in for transmission. -
#send_strings(parts, flags = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the
parts
passed in for transmission. -
#sendmsg(message, flags = 0) ⇒ Object
Queues the message for transmission.
-
#sendmsgs(parts, flags = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the
parts
passed in for transmission. -
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
6 7 8 |
# File 'lib/ffi-rzmq/socket.rb', line 6 def name @name end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
6 7 8 |
# File 'lib/ffi-rzmq/socket.rb', line 6 def socket @socket end |
Class Method Details
.create(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object
Allocates a socket of type type
for sending and receiving data.
type
can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.
By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
sock = Socket.create(Context.create, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.
Creation of a new Socket object can return nil when socket creation fails.
if (socket = Socket.new(context.pointer, ZMQ::REQ))
...
else
STDERR.puts "Socket creation failed"
end
33 34 35 |
# File 'lib/ffi-rzmq/socket.rb', line 33 def self.create context_ptr, type, opts = {:receiver_class => ZMQ::Message} new(context_ptr, type, opts) rescue nil end |
Instance Method Details
#bind(address) ⇒ Object
Binds the socket to an address
.
socket.bind("tcp://127.0.0.1:5555")
180 181 182 |
# File 'lib/ffi-rzmq/socket.rb', line 180 def bind address LibZMQ.zmq_bind @socket, address end |
#close ⇒ Object
Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option ZMQ::LINGER.
Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check ZMQ.errno for the error code.
rc = socket.close
puts("Given socket was invalid!") unless 0 == rc
201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/ffi-rzmq/socket.rb', line 201 def close if @socket remove_finalizer rc = LibZMQ.zmq_close @socket @socket = nil release_cache rc else 0 end end |
#connect(address) ⇒ Object
Connects the socket to an address
.
rc = socket.connect("tcp://127.0.0.1:5555")
188 189 190 |
# File 'lib/ffi-rzmq/socket.rb', line 188 def connect address rc = LibZMQ.zmq_connect @socket, address end |
#initialize(context_ptr, type, opts = {:receiver_class => ZMQ::Message}) ⇒ Object
To avoid rescuing exceptions, use the factory method #create for all socket creation.
Allocates a socket of type type
for sending and receiving data.
type
can be one of ZMQ::REQ, ZMQ::REP, ZMQ::PUB, ZMQ::SUB, ZMQ::PAIR, ZMQ::PULL, ZMQ::PUSH, ZMQ::XREQ, ZMQ::REP, ZMQ::DEALER or ZMQ::ROUTER.
By default, this class uses ZMQ::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use ZMQ::ManagedMessage.
sock = Socket.new(Context.new, ZMQ::REQ, :receiver_class => ZMQ::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as ZMQ::Message.
Creation of a new Socket object can raise an exception. This occurs when the context_ptr
is null or when the allocation of the 0mq socket within the context fails.
begin
socket = Socket.new(context.pointer, ZMQ::REQ)
rescue ContextError => e
# error handling
end
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/ffi-rzmq/socket.rb', line 66 def initialize context_ptr, type, opts = {:receiver_class => ZMQ::Message} # users may override the classes used for receiving; class must conform to the # same public API as ZMQ::Message @receiver_klass = opts[:receiver_class] context_ptr = context_ptr.pointer if context_ptr.kind_of?(ZMQ::Context) unless context_ptr.null? @socket = LibZMQ.zmq_socket context_ptr, type if @socket && !@socket.null? @name = SocketTypeNameMap[type] else raise ContextError.new 'zmq_socket', 0, ETERM, "Socket pointer was null" end else raise ContextError.new 'zmq_socket', 0, ETERM, "Context pointer was null" end @longlong_cache = @int_cache = nil @more_parts_array = [] @option_lookup = [] populate_option_lookup define_finalizer end |
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
Equivalent to calling Socket#getsockopt with ZMQ::RCVMORE.
Warning: if the call to #getsockopt fails, this method will return false and swallow the error.
= []
= Message.new
rc = socket.recvmsg()
if ZMQ::Util.resultcode_ok?(rc)
<<
while more_parts?
= Message.new
rc = socket.recvmsg()
.push() if resulcode_ok?(rc)
end
end
170 171 172 173 174 |
# File 'lib/ffi-rzmq/socket.rb', line 170 def more_parts? rc = getsockopt ZMQ::RCVMORE, @more_parts_array Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false end |
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list
for receiving the message body parts and a routing_envelope
for receiving the message parts comprising the 0mq routing information.
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/ffi-rzmq/socket.rb', line 410 def recv_multipart list, routing_envelope, flag = 0 parts = [] rc = recvmsgs parts, flag if Util.resultcode_ok?(rc) routing = true parts.each do |part| if routing routing_envelope << part routing = part.size > 0 else list << part end end end rc end |
#recv_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
flags
may be ZMQ::NonBlocking.
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When
flags
is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
The application code is responsible for handling the message
object lifecycle when #recv returns an error code.
342 343 344 345 346 347 348 |
# File 'lib/ffi-rzmq/socket.rb', line 342 def recv_string string, flags = 0 = @receiver_klass.new rc = recvmsg , flags string.replace(.copy_out_string) if Util.resultcode_ok?(rc) .close rc end |
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
flag
may be ZMQ::NonBlocking. Any other flag will be removed.
355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/ffi-rzmq/socket.rb', line 355 def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag if Util.resultcode_ok?(rc) array.each do || list << .copy_out_string .close end end rc end |
#recvmsg(message, flags = 0) ⇒ Object
Dequeues a message from the underlying queue. By default, this is a blocking operation.
flags
may take two values:
0 (default) - blocking operation
ZMQ::NonBlocking - non-blocking operation
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When
flags
is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
The application code is responsible for handling the message
object lifecycle when #recv returns an error code.
321 322 323 324 |
# File 'lib/ffi-rzmq/socket.rb', line 321 def recvmsg , flags = 0 #LibZMQ.zmq_recvmsg @socket, message.address, flags __recvmsg__(@socket, .address, flags) end |
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
flag
may be ZMQ::NonBlocking. Any other flag will be removed.
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/ffi-rzmq/socket.rb', line 375 def recvmsgs list, flag = 0 flag = NonBlocking if dontwait?(flag) = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << # check rc *first*; necessary because the call to #more_parts? can reset # the zmq_errno to a weird value, so the zmq_errno that was set on the # call to #recv gets lost while Util.resultcode_ok?(rc) && more_parts? = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << else .close list.each { |msg| msg.close } list.clear end end else .close end rc end |
#send_and_close(message, flags = 0) ⇒ Object
Sends a message. This will automatically close the message
for both successful and failed sends.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
298 299 300 301 302 |
# File 'lib/ffi-rzmq/socket.rb', line 298 def send_and_close , flags = 0 rc = sendmsg , flags .close rc end |
#send_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance out of the string
passed in for transmission.
flags
may be ZMQ::NonBlocking and ZMQ::SNDMORE.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
246 247 248 249 |
# File 'lib/ffi-rzmq/socket.rb', line 246 def send_string string, flags = 0 = Message.new string send_and_close , flags end |
#send_strings(parts, flags = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the parts
passed in for transmission. Every element of parts
should be a String.
flags
may be ZMQ::NonBlocking.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When
flags
is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
265 266 267 |
# File 'lib/ffi-rzmq/socket.rb', line 265 def send_strings parts, flags = 0 send_multiple(parts, flags, :send_string) end |
#sendmsg(message, flags = 0) ⇒ Object
Queues the message for transmission. Message is assumed to conform to the same public API as #Message.
flags
may take two values:
-
0 (default) - blocking operation
-
ZMQ::NonBlocking - non-blocking operation
-
ZMQ::SNDMORE - this message is part of a multi-part message
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
229 230 231 |
# File 'lib/ffi-rzmq/socket.rb', line 229 def sendmsg , flags = 0 __sendmsg__(@socket, .address, flags) end |
#sendmsgs(parts, flags = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the parts
passed in for transmission. Every element of parts
should be a Message (or subclass).
flags
may be ZMQ::NonBlocking.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When
flags
is set with ZMQ::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
283 284 285 |
# File 'lib/ffi-rzmq/socket.rb', line 283 def sendmsgs parts, flags = 0 send_multiple(parts, flags, :sendmsg) end |
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
Valid name
values that take a numeric value
are:
ZMQ::HWM
ZMQ::SWAP (version 2 only)
ZMQ::AFFINITY
ZMQ::RATE
ZMQ::RECOVERY_IVL
ZMQ::MCAST_LOOP (version 2 only)
ZMQ::LINGER
ZMQ::RECONNECT_IVL
ZMQ::BACKLOG
ZMQ::RECOVER_IVL_MSEC (version 2 only)
ZMQ::RECONNECT_IVL_MAX (version 3 only)
ZMQ::MAXMSGSIZE (version 3 only)
ZMQ::SNDHWM (version 3 only)
ZMQ::RCVHWM (version 3 only)
ZMQ::MULTICAST_HOPS (version 3 only)
ZMQ::RCVTIMEO (version 3 only)
ZMQ::SNDTIMEO (version 3 only)
Valid name
values that take a string value
are:
ZMQ::IDENTITY (version 2/3 only)
ZMQ::SUBSCRIBE
ZMQ::UNSUBSCRIBE
Returns 0 when the operation completed successfully. Returns -1 when this operation failed.
With a -1 return code, the user must check ZMQ.errno to determine the cause.
rc = socket.setsockopt(ZMQ::LINGER, 1_000)
ZMQ::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/ffi-rzmq/socket.rb', line 127 def setsockopt name, value, length = nil if 1 == @option_lookup[name] length = 8 pointer = LibC.malloc length pointer.write_long_long value elsif 0 == @option_lookup[name] length = 4 pointer = LibC.malloc length pointer.write_int value elsif 2 == @option_lookup[name] length ||= value.size # note: not checking errno for failed memory allocations :( pointer = LibC.malloc length pointer.write_string value end rc = LibZMQ.zmq_setsockopt @socket, name, pointer, length LibC.free(pointer) unless pointer.nil? || pointer.null? rc end |