Module: XS::CommonSocketBehavior
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
Class Method Summary collapse
-
.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Object
Allocates a socket of type
type
for sending and receiving data.
Instance Method Summary collapse
-
#bind(address) ⇒ Object
Binds the socket to an
address
. -
#close ⇒ Object
Closes the socket.
-
#connect(address) ⇒ Object
Connects the socket to an
address
. -
#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Object
To avoid rescuing exceptions, use the factory method #create for all socket creation.
-
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
-
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets.
-
#recv_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
-
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
-
#recvmsg(message, flags = 0) ⇒ Object
Dequeues a message from the underlying queue.
-
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
-
#send_and_close(message, flags = 0) ⇒ Object
Sends a message.
-
#send_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance out of the
string
passed in for transmission. -
#send_strings(parts, flags = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the
parts
passed in for transmission. -
#sendmsg(message, flags = 0) ⇒ Object
Queues the message for transmission.
-
#sendmsgs(parts, flags = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the
parts
passed in for transmission. -
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
Methods included from Util
bind_to_random_tcp_port, errno, error_string, resultcode_ok?, version
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
7 8 9 |
# File 'lib/ffi-rxs/socket.rb', line 7 def name @name end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
7 8 9 |
# File 'lib/ffi-rxs/socket.rb', line 7 def socket @socket end |
Class Method Details
.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Object
Allocates a socket of type type
for sending and receiving data.
type
can be one of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER.
By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.
sock = Socket.create(Context.create, XS::REQ, :receiver_class => XS::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.
Creation of a new Socket object can return nil when socket creation fails.
if (socket = Socket.new(context.pointer, XS::REQ))
...
else
STDERR.puts "Socket creation failed"
end
34 35 36 |
# File 'lib/ffi-rxs/socket.rb', line 34 def self.create context_ptr, type, opts = {:receiver_class => XS::Message} new(context_ptr, type, opts) rescue nil end |
Instance Method Details
#bind(address) ⇒ Object
Binds the socket to an address
.
socket.bind("tcp://127.0.0.1:5555")
181 182 183 |
# File 'lib/ffi-rxs/socket.rb', line 181 def bind address LibXS.xs_bind @socket, address end |
#close ⇒ Object
Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option XS::LINGER.
Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check XS.errno for the error code.
rc = socket.close
puts("Given socket was invalid!") unless 0 == rc
202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/ffi-rxs/socket.rb', line 202 def close if @socket remove_finalizer rc = LibXS.xs_close @socket @socket = nil release_cache rc else 0 end end |
#connect(address) ⇒ Object
Connects the socket to an address
.
rc = socket.connect("tcp://127.0.0.1:5555")
189 190 191 |
# File 'lib/ffi-rxs/socket.rb', line 189 def connect address rc = LibXS.xs_connect @socket, address end |
#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Object
To avoid rescuing exceptions, use the factory method #create for all socket creation.
Allocates a socket of type type
for sending and receiving data.
type
can be one of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER.
By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.
sock = Socket.new(Context.new, XS::REQ, :receiver_class => XS::ManagedMessage)
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.
Creation of a new Socket object can raise an exception. This occurs when the context_ptr
is null or when the allocation of the Crossroads socket within the context fails.
begin
socket = Socket.new(context.pointer, XS::REQ)
rescue ContextError => e
# error handling
end
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/ffi-rxs/socket.rb', line 67 def initialize context_ptr, type, opts = {:receiver_class => XS::Message} # users may override the classes used for receiving; class must conform to the # same public API as XS::Message @receiver_klass = opts[:receiver_class] context_ptr = context_ptr.pointer if context_ptr.kind_of?(XS::Context) unless context_ptr.null? @socket = LibXS.xs_socket context_ptr, type if @socket && !@socket.null? @name = SocketTypeNameMap[type] else raise ContextError.new 'xs_socket', 0, ETERM, "Socket pointer was null" end else raise ContextError.new 'xs_socket', 0, ETERM, "Context pointer was null" end @longlong_cache = @int_cache = nil @more_parts_array = [] @option_lookup = [] populate_option_lookup define_finalizer end |
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
Equivalent to calling Socket#getsockopt with XS::RCVMORE.
Warning: if the call to #getsockopt fails, this method will return false and swallow the error.
= []
= Message.new
rc = socket.recvmsg()
if XS::Util.resultcode_ok?(rc)
<<
while more_parts?
= Message.new
rc = socket.recvmsg()
.push() if resulcode_ok?(rc)
end
end
171 172 173 174 175 |
# File 'lib/ffi-rxs/socket.rb', line 171 def more_parts? rc = getsockopt XS::RCVMORE, @more_parts_array Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false end |
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list
for receiving the message body parts and a routing_envelope
for receiving the message parts comprising the 0mq routing information.
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/ffi-rxs/socket.rb', line 427 def recv_multipart list, routing_envelope, flag = 0 parts = [] rc = recvmsgs parts, flag if Util.resultcode_ok?(rc) routing = true parts.each do |part| if routing routing_envelope << part routing = part.size > 0 else list << part end end end rc end |
#recv_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
flags
may be XS::NonBlocking.
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When
flags
is set with XS::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check XS.errno to determine the cause.
The application code is responsible for handling the message
object lifecycle when #recv returns an error code.
359 360 361 362 363 364 365 |
# File 'lib/ffi-rxs/socket.rb', line 359 def recv_string string, flags = 0 = @receiver_klass.new rc = recvmsg , flags string.replace(.copy_out_string) if Util.resultcode_ok?(rc) .close rc end |
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
flag
may be XS::NonBlocking. Any other flag will be removed.
372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/ffi-rxs/socket.rb', line 372 def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag if Util.resultcode_ok?(rc) array.each do || list << .copy_out_string .close end end rc end |
#recvmsg(message, flags = 0) ⇒ Object
Dequeues a message from the underlying queue. By default, this is a blocking operation.
flags
may take two values:
0 (default) - blocking operation
XS::NonBlocking - non-blocking operation
Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.
-
The message could not be dequeued
-
When
flags
is set with XS::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check XS.errno to determine the cause.
The application code is responsible for handling the message
object lifecycle when #recv returns an error code.
338 339 340 341 |
# File 'lib/ffi-rxs/socket.rb', line 338 def recvmsg , flags = 0 #LibXS.xs_recvmsg @socket, message.address, flags __recvmsg__(@socket, .address, flags) end |
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
flag
may be XS::NonBlocking. Any other flag will be removed.
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 |
# File 'lib/ffi-rxs/socket.rb', line 392 def recvmsgs list, flag = 0 flag = NonBlocking if dontwait?(flag) = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << # check rc *first*; necessary because the call to #more_parts? can reset # the xs_errno to a weird value, so the xs_errno that was set on the # call to #recv gets lost while Util.resultcode_ok?(rc) && more_parts? = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << else .close list.each { |msg| msg.close } list.clear end end else .close end rc end |
#send_and_close(message, flags = 0) ⇒ Object
Sends a message. This will automatically close the message
for both successful and failed sends.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with XS::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check XS.errno to determine the cause.
315 316 317 318 319 |
# File 'lib/ffi-rxs/socket.rb', line 315 def send_and_close , flags = 0 rc = sendmsg , flags .close rc end |
#send_string(string, flags = 0) ⇒ Object
Helper method to make a new #Message instance out of the string
passed in for transmission.
flags
may be XS::NonBlocking and XS::SNDMORE.
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with XS::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check XS.errno to determine the cause.
247 248 249 250 |
# File 'lib/ffi-rxs/socket.rb', line 247 def send_string string, flags = 0 = Message.new string send_and_close , flags end |
#send_strings(parts, flags = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the parts
passed in for transmission. Every element of parts
should be a String.
flags
may be XS::NonBlocking.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When
flags
is set with XS::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check XS.errno to determine the cause.
266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/ffi-rxs/socket.rb', line 266 def send_strings parts, flags = 0 return -1 if !parts || parts.empty? flags = NonBlocking if dontwait?(flags) parts[0..-2].each do |part| rc = send_string part, (flags | XS::SNDMORE) return rc unless Util.resultcode_ok?(rc) end send_string parts[-1], flags end |
#sendmsg(message, flags = 0) ⇒ Object
Queues the message for transmission. Message is assumed to conform to the same public API as #Message.
flags
may take two values:
-
0 (default) - blocking operation
-
XS::NonBlocking - non-blocking operation
-
XS::SNDMORE - this message is part of a multi-part message
Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.
-
The message could not be enqueued
-
When
flags
is set with XS::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check XS.errno to determine the cause.
230 231 232 |
# File 'lib/ffi-rxs/socket.rb', line 230 def sendmsg , flags = 0 __sendmsg__(@socket, .address, flags) end |
#sendmsgs(parts, flags = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the parts
passed in for transmission. Every element of parts
should be a Message (or subclass).
flags
may be XS::NonBlocking.
Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.
-
A message could not be enqueued
-
When
flags
is set with XS::NonBlocking and the socket returned EAGAIN.
With a -1 return code, the user must check XS.errno to determine the cause.
292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/ffi-rxs/socket.rb', line 292 def sendmsgs parts, flags = 0 return -1 if !parts || parts.empty? flags = NonBlocking if dontwait?(flags) parts[0..-2].each do |part| rc = sendmsg part, (flags | XS::SNDMORE) return rc unless Util.resultcode_ok?(rc) end sendmsg parts[-1], flags end |
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
Valid name
values that take a numeric value
are:
XS::HWM
XS::SWAP (version 2 only)
XS::AFFINITY
XS::RATE
XS::RECOVERY_IVL
XS::MCAST_LOOP (version 2 only)
XS::LINGER
XS::RECONNECT_IVL
XS::BACKLOG
XS::RECOVER_IVL_MSEC (version 2 only)
XS::RECONNECT_IVL_MAX (version 3 only)
XS::MAXMSGSIZE (version 3 only)
XS::SNDHWM (version 3 only)
XS::RCVHWM (version 3 only)
XS::MULTICAST_HOPS (version 3 only)
XS::RCVTIMEO (version 3 only)
XS::SNDTIMEO (version 3 only)
Valid name
values that take a string value
are:
XS::IDENTITY (version 2/3 only)
XS::SUBSCRIBE
XS::UNSUBSCRIBE
Returns 0 when the operation completed successfully. Returns -1 when this operation failed.
With a -1 return code, the user must check XS.errno to determine the cause.
rc = socket.setsockopt(XS::LINGER, 1_000)
XS::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/ffi-rxs/socket.rb', line 128 def setsockopt name, value, length = nil if 1 == @option_lookup[name] length = 8 pointer = LibC.malloc length pointer.write_long_long value elsif 0 == @option_lookup[name] length = 4 pointer = LibC.malloc length pointer.write_int value elsif 2 == @option_lookup[name] length ||= value.size # note: not checking errno for failed memory allocations :( pointer = LibC.malloc length pointer.write_string value end rc = LibXS.xs_setsockopt @socket, name, pointer, length LibC.free(pointer) unless pointer.nil? || pointer.null? rc end |