Module: XS::CommonSocketBehavior
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
Class Method Summary collapse
-
.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
Instance Method Summary collapse
-
#bind(address) ⇒ Object
Binds the socket to an address.
-
#close ⇒ Object
Closes the socket.
-
#connect(address) ⇒ Object
Connects the socket to an address.
-
#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
-
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
-
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets.
-
#recv_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
-
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
-
#recvmsg(message, flag = 0) ⇒ Object
Dequeues a message from the underlying queue.
-
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
-
#send_and_close(message, flag = 0) ⇒ Object
Sends a message.
-
#send_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance out of the string passed in for transmission.
-
#send_strings(parts, flag = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the parts passed in for transmission.
-
#sendmsg(message, flag = 0) ⇒ Object
Queues the message for transmission.
-
#sendmsgs(parts, flag = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the parts passed in for transmission.
-
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket.
Methods included from Util
bind_to_random_tcp_port, errno, error_string, resultcode_ok?, version
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
8 9 10 |
# File 'lib/ffi-rxs/socket.rb', line 8 def name @name end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
8 9 10 |
# File 'lib/ffi-rxs/socket.rb', line 8 def socket @socket end |
Class Method Details
.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.
40 41 42 |
# File 'lib/ffi-rxs/socket.rb', line 40 def self.create context_ptr, type, opts = {:receiver_class => XS::Message} new(context_ptr, type, opts) rescue nil end |
Instance Method Details
#bind(address) ⇒ Object
Binds the socket to an address.
187 188 189 |
# File 'lib/ffi-rxs/socket.rb', line 187 def bind address LibXS.xs_bind @socket, address end |
#close ⇒ Object
Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option XS::LINGER.
213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/ffi-rxs/socket.rb', line 213 def close if @socket remove_finalizer rc = LibXS.xs_close @socket @socket = nil release_cache rc else 0 end end |
#connect(address) ⇒ Object
Connects the socket to an address.
200 201 202 |
# File 'lib/ffi-rxs/socket.rb', line 200 def connect address rc = LibXS.xs_connect @socket, address end |
#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket
Allocates a socket of type type for sending and receiving data.
To avoid rescuing exceptions, use the factory method #create for all socket creation.
By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.
Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.
Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the Crossroads socket within the context fails.
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/ffi-rxs/socket.rb', line 80 def initialize context_ptr, type, opts = {:receiver_class => XS::Message} # users may override the classes used for receiving; class must conform to the # same public API as XS::Message @receiver_klass = opts[:receiver_class] context_ptr = context_ptr.pointer if context_ptr.kind_of?(XS::Context) unless context_ptr.null? @socket = LibXS.xs_socket context_ptr, type if @socket && !@socket.null? @name = SocketTypeNameMap[type] else raise ContextError.new 'xs_socket', 0, ETERM, "Socket pointer was null" end else raise ContextError.new 'xs_socket', 0, ETERM, "Context pointer was null" end @longlong_cache = @int_cache = nil @more_parts_array = [] @option_lookup = [] populate_option_lookup define_finalizer end |
#more_parts? ⇒ Boolean
Convenience method for checking on additional message parts.
Equivalent to calling Socket#getsockopt with XS::RCVMORE.
Warning: if the call to #getsockopt fails, this method will return false and swallow the error.
175 176 177 178 179 |
# File 'lib/ffi-rxs/socket.rb', line 175 def more_parts? rc = getsockopt XS::RCVMORE, @more_parts_array Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false end |
#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object
Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/ffi-rxs/socket.rb', line 457 def recv_multipart list, routing_envelope, flag = 0 parts = [] rc = recvmsgs parts, flag if Util.resultcode_ok?(rc) routing = true parts.each do |part| if routing routing_envelope << part routing = part.size > 0 else list << part end end end rc end |
#recv_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance and convert its payload to a string.
With a -1 return code, the user must check XS.errno to determine the cause.
The application code is responsible for handling the message object lifecycle when #recv returns an error code.
374 375 376 377 378 379 380 |
# File 'lib/ffi-rxs/socket.rb', line 374 def recv_string string, flag = 0 = @receiver_klass.new rc = recvmsg , flag string.replace(.copy_out_string) if Util.resultcode_ok?(rc) .close rc end |
#recv_strings(list, flag = 0) ⇒ Object
Receive a multipart message as a list of strings.
391 392 393 394 395 396 397 398 399 400 401 402 403 |
# File 'lib/ffi-rxs/socket.rb', line 391 def recv_strings list, flag = 0 array = [] rc = recvmsgs array, flag if Util.resultcode_ok?(rc) array.each do || list << .copy_out_string .close end end rc end |
#recvmsg(message, flag = 0) ⇒ Object
Dequeues a message from the underlying queue. By default, this is a blocking operation.
With a -1 return code, the user must check XS.errno to determine the cause.
The application code is responsible for handling the message object lifecycle when #recv returns an error code.
353 354 355 |
# File 'lib/ffi-rxs/socket.rb', line 353 def recvmsg , flag = 0 __recvmsg__(@socket, .address, flag) end |
#recvmsgs(list, flag = 0) ⇒ Object
Receive a multipart message as an array of objects (by default these are instances of Message).
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/ffi-rxs/socket.rb', line 415 def recvmsgs list, flag = 0 flag = NonBlocking if dontwait?(flag) = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << # check rc *first*; necessary because the call to #more_parts? can reset # the xs_errno to a weird value, so the xs_errno that was set on the # call to #recv gets lost while Util.resultcode_ok?(rc) && more_parts? = @receiver_klass.new rc = recvmsg , flag if Util.resultcode_ok?(rc) list << else .close list.each { |msg| msg.close } list.clear end end else .close end rc end |
#send_and_close(message, flag = 0) ⇒ Object
Sends a message. This will automatically close the message for both successful and failed sends.
With a -1 return code, the user must check XS.errno to determine the cause.
331 332 333 334 335 |
# File 'lib/ffi-rxs/socket.rb', line 331 def send_and_close , flag = 0 rc = sendmsg , flag .close rc end |
#send_string(string, flag = 0) ⇒ Object
Helper method to make a new #Message instance out of the string passed in for transmission.
With a -1 return code, the user must check XS.errno to determine the cause.
258 259 260 261 |
# File 'lib/ffi-rxs/socket.rb', line 258 def send_string string, flag = 0 = Message.new string send_and_close , flag end |
#send_strings(parts, flag = 0) ⇒ Object
Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.
With a -1 return code, the user must check XS.errno to determine the cause.
278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/ffi-rxs/socket.rb', line 278 def send_strings parts, flag = 0 return -1 if !parts || parts.empty? flag = NonBlocking if dontwait?(flag) parts[0..-2].each do |part| rc = send_string part, (flag | XS::SNDMORE) return rc unless Util.resultcode_ok?(rc) end send_string parts[-1], flag end |
#sendmsg(message, flag = 0) ⇒ Object
Queues the message for transmission. Message is assumed to conform to the same public API as #Message.
With a -1 return code, the user must check XS.errno to determine the cause.
240 241 242 |
# File 'lib/ffi-rxs/socket.rb', line 240 def sendmsg , flag = 0 __sendmsg__(@socket, .address, flag) end |
#sendmsgs(parts, flag = 0) ⇒ Object
Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).
With a -1 return code, the user must check XS.errno to determine the cause.
305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/ffi-rxs/socket.rb', line 305 def sendmsgs parts, flag = 0 return -1 if !parts || parts.empty? flag = NonBlocking if dontwait?(flag) parts[0..-2].each do |part| rc = sendmsg part, (flag | XS::SNDMORE) return rc unless Util.resultcode_ok?(rc) end sendmsg parts[-1], flag end |
#setsockopt(name, value, length = nil) ⇒ Object
Set the queue options on this socket
With a -1 return code, the user must check XS.errno to determine the cause.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/ffi-rxs/socket.rb', line 129 def setsockopt name, value, length = nil if 1 == @option_lookup[name] length = 8 pointer = LibC.malloc length pointer.write_long_long value elsif 0 == @option_lookup[name] length = 4 pointer = LibC.malloc length pointer.write_int value elsif 2 == @option_lookup[name] length ||= value.size # note: not checking errno for failed memory allocations :( pointer = LibC.malloc length pointer.write_string value end rc = LibXS.xs_setsockopt @socket, name, pointer, length LibC.free(pointer) unless pointer.nil? || pointer.null? rc end |