Module: XS::CommonSocketBehavior

Includes:
Util
Included in:
Socket
Defined in:
lib/ffi-rxs/socket.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

bind_to_random_tcp_port, errno, error_string, resultcode_ok?, version

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



7
8
9
# File 'lib/ffi-rxs/socket.rb', line 7

def name
  @name
end

#socketObject (readonly)

Returns the value of attribute socket.



7
8
9
# File 'lib/ffi-rxs/socket.rb', line 7

def socket
  @socket
end

Class Method Details

.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Object

Allocates a socket of type type for sending and receiving data.

type can be one of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER.

By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.

sock = Socket.create(Context.create, XS::REQ, :receiver_class => XS::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.

Creation of a new Socket object can return nil when socket creation fails.

if (socket = Socket.new(context.pointer, XS::REQ))
  ...
else
  STDERR.puts "Socket creation failed"
end


34
35
36
# File 'lib/ffi-rxs/socket.rb', line 34

def self.create context_ptr, type, opts = {:receiver_class => XS::Message}
  new(context_ptr, type, opts) rescue nil
end

Instance Method Details

#bind(address) ⇒ Object

Binds the socket to an address.

socket.bind("tcp://127.0.0.1:5555")


181
182
183
# File 'lib/ffi-rxs/socket.rb', line 181

def bind address
  LibXS.xs_bind @socket, address
end

#closeObject

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option XS::LINGER.

Returns 0 upon success or when the socket has already been closed. Returns -1 when the operation fails. Check XS.errno for the error code.

rc = socket.close
puts("Given socket was invalid!") unless 0 == rc


202
203
204
205
206
207
208
209
210
211
212
# File 'lib/ffi-rxs/socket.rb', line 202

def close
  if @socket
    remove_finalizer
    rc = LibXS.xs_close @socket
    @socket = nil
    release_cache
    rc
  else
    0
  end
end

#connect(address) ⇒ Object

Connects the socket to an address.

rc = socket.connect("tcp://127.0.0.1:5555")


189
190
191
# File 'lib/ffi-rxs/socket.rb', line 189

def connect address
  rc = LibXS.xs_connect @socket, address
end

#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Object

To avoid rescuing exceptions, use the factory method #create for all socket creation.

Allocates a socket of type type for sending and receiving data.

type can be one of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER.

By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.

sock = Socket.new(Context.new, XS::REQ, :receiver_class => XS::ManagedMessage)

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.

Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the Crossroads socket within the context fails.

begin
  socket = Socket.new(context.pointer, XS::REQ)
rescue ContextError => e
  # error handling
end


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/ffi-rxs/socket.rb', line 67

def initialize context_ptr, type, opts = {:receiver_class => XS::Message}
  # users may override the classes used for receiving; class must conform to the
  # same public API as XS::Message
  @receiver_klass = opts[:receiver_class]

  context_ptr = context_ptr.pointer if context_ptr.kind_of?(XS::Context)

  unless context_ptr.null?
    @socket = LibXS.xs_socket context_ptr, type
    if @socket && !@socket.null?
      @name = SocketTypeNameMap[type]
    else
      raise ContextError.new 'xs_socket', 0, ETERM, "Socket pointer was null"
    end
  else
    raise ContextError.new 'xs_socket', 0, ETERM, "Context pointer was null"
  end

  @longlong_cache = @int_cache = nil
  @more_parts_array = []
  @option_lookup = []
  populate_option_lookup

  define_finalizer
end

#more_parts?Boolean

Convenience method for checking on additional message parts.

Equivalent to calling Socket#getsockopt with XS::RCVMORE.

Warning: if the call to #getsockopt fails, this method will return false and swallow the error.

message_parts = []
message = Message.new
rc = socket.recvmsg(message)
if XS::Util.resultcode_ok?(rc)
  message_parts << message
  while more_parts?
    message = Message.new
    rc = socket.recvmsg(message)
    message_parts.push(message) if resulcode_ok?(rc)
  end
end

Returns:

  • (Boolean)


171
172
173
174
175
# File 'lib/ffi-rxs/socket.rb', line 171

def more_parts?
  rc = getsockopt XS::RCVMORE, @more_parts_array

  Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
end

#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object

Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.



427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/ffi-rxs/socket.rb', line 427

def recv_multipart list, routing_envelope, flag = 0
  parts = []
  rc = recvmsgs parts, flag

  if Util.resultcode_ok?(rc)
    routing = true
    parts.each do |part|
      if routing
        routing_envelope << part
        routing = part.size > 0
      else
        list << part
      end
    end
  end

  rc
end

#recv_string(string, flags = 0) ⇒ Object

Helper method to make a new #Message instance and convert its payload to a string.

flags may be XS::NonBlocking.

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with XS::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check XS.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



359
360
361
362
363
364
365
# File 'lib/ffi-rxs/socket.rb', line 359

def recv_string string, flags = 0
  message = @receiver_klass.new
  rc = recvmsg message, flags
  string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
  message.close
  rc
end

#recv_strings(list, flag = 0) ⇒ Object

Receive a multipart message as a list of strings.

flag may be XS::NonBlocking. Any other flag will be removed.



372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/ffi-rxs/socket.rb', line 372

def recv_strings list, flag = 0
  array = []
  rc = recvmsgs array, flag

  if Util.resultcode_ok?(rc)
    array.each do |message|
      list << message.copy_out_string
      message.close
    end
  end

  rc
end

#recvmsg(message, flags = 0) ⇒ Object

Dequeues a message from the underlying queue. By default, this is a blocking operation.

flags may take two values:

0 (default) - blocking operation
XS::NonBlocking - non-blocking operation

Returns 0 when the message was successfully dequeued. Returns -1 under two conditions.

  1. The message could not be dequeued

  2. When flags is set with XS::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check XS.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.



338
339
340
341
# File 'lib/ffi-rxs/socket.rb', line 338

def recvmsg message, flags = 0
  #LibXS.xs_recvmsg @socket, message.address, flags
  __recvmsg__(@socket, message.address, flags)
end

#recvmsgs(list, flag = 0) ⇒ Object

Receive a multipart message as an array of objects (by default these are instances of Message).

flag may be XS::NonBlocking. Any other flag will be removed.



392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# File 'lib/ffi-rxs/socket.rb', line 392

def recvmsgs list, flag = 0
  flag = NonBlocking if dontwait?(flag)

  message = @receiver_klass.new
  rc = recvmsg message, flag

  if Util.resultcode_ok?(rc)
    list << message

    # check rc *first*; necessary because the call to #more_parts? can reset
    # the xs_errno to a weird value, so the xs_errno that was set on the
    # call to #recv gets lost
    while Util.resultcode_ok?(rc) && more_parts?
      message = @receiver_klass.new
      rc = recvmsg message, flag

      if Util.resultcode_ok?(rc)
        list << message
      else
        message.close
        list.each { |msg| msg.close }
        list.clear
      end
    end
  else
    message.close
  end

  rc
end

#send_and_close(message, flags = 0) ⇒ Object

Sends a message. This will automatically close the message for both successful and failed sends.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with XS::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check XS.errno to determine the cause.



315
316
317
318
319
# File 'lib/ffi-rxs/socket.rb', line 315

def send_and_close message, flags = 0
  rc = sendmsg message, flags
  message.close
  rc
end

#send_string(string, flags = 0) ⇒ Object

Helper method to make a new #Message instance out of the string passed in for transmission.

flags may be XS::NonBlocking and XS::SNDMORE.

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with XS::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check XS.errno to determine the cause.



247
248
249
250
# File 'lib/ffi-rxs/socket.rb', line 247

def send_string string, flags = 0
  message = Message.new string
  send_and_close message, flags
end

#send_strings(parts, flags = 0) ⇒ Object

Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.

flags may be XS::NonBlocking.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with XS::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check XS.errno to determine the cause.



266
267
268
269
270
271
272
273
274
275
276
# File 'lib/ffi-rxs/socket.rb', line 266

def send_strings parts, flags = 0
  return -1 if !parts || parts.empty?
  flags = NonBlocking if dontwait?(flags)

  parts[0..-2].each do |part|
    rc = send_string part, (flags | XS::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  send_string parts[-1], flags
end

#sendmsg(message, flags = 0) ⇒ Object

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

flags may take two values:

  • 0 (default) - blocking operation

  • XS::NonBlocking - non-blocking operation

  • XS::SNDMORE - this message is part of a multi-part message

Returns 0 when the message was successfully enqueued. Returns -1 under two conditions.

  1. The message could not be enqueued

  2. When flags is set with XS::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check XS.errno to determine the cause.



230
231
232
# File 'lib/ffi-rxs/socket.rb', line 230

def sendmsg message, flags = 0
  __sendmsg__(@socket, message.address, flags)
end

#sendmsgs(parts, flags = 0) ⇒ Object

Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).

flags may be XS::NonBlocking.

Returns 0 when the messages were successfully enqueued. Returns -1 under two conditions.

  1. A message could not be enqueued

  2. When flags is set with XS::NonBlocking and the socket returned EAGAIN.

With a -1 return code, the user must check XS.errno to determine the cause.



292
293
294
295
296
297
298
299
300
301
302
# File 'lib/ffi-rxs/socket.rb', line 292

def sendmsgs parts, flags = 0
  return -1 if !parts || parts.empty?
  flags = NonBlocking if dontwait?(flags)

  parts[0..-2].each do |part|
    rc = sendmsg part, (flags | XS::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  sendmsg parts[-1], flags
end

#setsockopt(name, value, length = nil) ⇒ Object

Set the queue options on this socket.

Valid name values that take a numeric value are:

XS::HWM
XS::SWAP (version 2 only)
XS::AFFINITY
XS::RATE
XS::RECOVERY_IVL
XS::MCAST_LOOP (version 2 only)
XS::LINGER
XS::RECONNECT_IVL
XS::BACKLOG
XS::RECOVER_IVL_MSEC (version 2 only)
XS::RECONNECT_IVL_MAX (version 3 only)
XS::MAXMSGSIZE (version 3 only)
XS::SNDHWM (version 3 only)
XS::RCVHWM (version 3 only)
XS::MULTICAST_HOPS (version 3 only)
XS::RCVTIMEO (version 3 only)
XS::SNDTIMEO (version 3 only)

Valid name values that take a string value are:

XS::IDENTITY (version 2/3 only)
XS::SUBSCRIBE
XS::UNSUBSCRIBE

Returns 0 when the operation completed successfully. Returns -1 when this operation failed.

With a -1 return code, the user must check XS.errno to determine the cause.

rc = socket.setsockopt(XS::LINGER, 1_000)
XS::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")


128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/ffi-rxs/socket.rb', line 128

def setsockopt name, value, length = nil
  if 1 == @option_lookup[name]
    length = 8
    pointer = LibC.malloc length
    pointer.write_long_long value

  elsif 0 == @option_lookup[name]
    length = 4
    pointer = LibC.malloc length
    pointer.write_int value

  elsif 2 == @option_lookup[name]
    length ||= value.size

    # note: not checking errno for failed memory allocations :(
    pointer = LibC.malloc length
    pointer.write_string value
  end

  rc = LibXS.xs_setsockopt @socket, name, pointer, length
  LibC.free(pointer) unless pointer.nil? || pointer.null?
  rc
end