Module: XS::CommonSocketBehavior

Includes:
Util
Included in:
Socket
Defined in:
lib/ffi-rxs/socket.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

bind_to_random_tcp_port, errno, error_string, resultcode_ok?, version

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/ffi-rxs/socket.rb', line 8

def name
  @name
end

#socketObject (readonly)

Returns the value of attribute socket.



8
9
10
# File 'lib/ffi-rxs/socket.rb', line 8

def socket
  @socket
end

Class Method Details

.create(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket

Allocates a socket of type type for sending and receiving data.

By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.

Examples:

Socket creation

sock = Socket.create(Context.create, XS::REQ, :receiver_class => XS::ManagedMessage)
if (socket = Socket.new(context.pointer, XS::REQ))
  ...
else
  STDERR.puts "Socket creation failed"
end

Parameters:

  • pointer
  • type (Constant)

    One of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER

  • options (Hash)

Returns:

  • (Socket)

    when successful

  • nil when unsuccessful



40
41
42
# File 'lib/ffi-rxs/socket.rb', line 40

def self.create context_ptr, type, opts = {:receiver_class => XS::Message}
  new(context_ptr, type, opts) rescue nil
end

Instance Method Details

#bind(address) ⇒ Object

Binds the socket to an address.

Examples:

socket.bind("tcp://127.0.0.1:5555")

Parameters:

  • address


187
188
189
# File 'lib/ffi-rxs/socket.rb', line 187

def bind address
  LibXS.xs_bind @socket, address
end

#closeObject

Closes the socket. Any unprocessed messages in queue are sent or dropped depending upon the value of the socket option XS::LINGER.

Examples:

rc = socket.close
puts("Given socket was invalid!") unless 0 == rc

Returns:

  • 0 upon success or when the socket has already been closed

  • -1 when the operation fails. Check XS.errno for the error code



213
214
215
216
217
218
219
220
221
222
223
# File 'lib/ffi-rxs/socket.rb', line 213

def close
  if @socket
    remove_finalizer
    rc = LibXS.xs_close @socket
    @socket = nil
    release_cache
    rc
  else
    0
  end
end

#connect(address) ⇒ Object

Connects the socket to an address.

Examples:

rc = socket.connect("tcp://127.0.0.1:5555")

Parameters:

  • address

Returns:

  • 0 if successful

  • -1 if unsuccessful



200
201
202
# File 'lib/ffi-rxs/socket.rb', line 200

def connect address
  rc = LibXS.xs_connect @socket, address
end

#initialize(context_ptr, type, opts = {:receiver_class => XS::Message}) ⇒ Socket

Allocates a socket of type type for sending and receiving data.

To avoid rescuing exceptions, use the factory method #create for all socket creation.

By default, this class uses XS::Message for manual memory management. For automatic garbage collection of received messages, it is possible to override the :receiver_class to use XS::ManagedMessage.

Advanced users may want to replace the receiver class with their own custom class. The custom class must conform to the same public API as XS::Message.

Creation of a new Socket object can raise an exception. This occurs when the context_ptr is null or when the allocation of the Crossroads socket within the context fails.

Examples:

Socket creation

sock = Socket.new(Context.new, XS::REQ, :receiver_class => XS::ManagedMessage)
begin
  socket = Socket.new(context.pointer, XS::REQ)
rescue ContextError => e
  # error handling
end

Parameters:

  • pointer
  • type (Constant)

    One of XS::REQ, XS::REP, XS::PUB, XS::SUB, XS::PAIR, XS::PULL, XS::PUSH, XS::XREQ, XS::REP, XS::DEALER or XS::ROUTER

  • options (Hash)

Returns:

  • (Socket)

    when successful

  • nil when unsuccessful



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/ffi-rxs/socket.rb', line 80

def initialize context_ptr, type, opts = {:receiver_class => XS::Message}
  # users may override the classes used for receiving; class must conform to the
  # same public API as XS::Message
  @receiver_klass = opts[:receiver_class]

  context_ptr = context_ptr.pointer if context_ptr.kind_of?(XS::Context)

  unless context_ptr.null?
    @socket = LibXS.xs_socket context_ptr, type
    if @socket && !@socket.null?
      @name = SocketTypeNameMap[type]
    else
      raise ContextError.new 'xs_socket', 0, ETERM, "Socket pointer was null"
    end
  else
    raise ContextError.new 'xs_socket', 0, ETERM, "Context pointer was null"
  end

  @longlong_cache = @int_cache = nil
  @more_parts_array = []
  @option_lookup = []
  populate_option_lookup

  define_finalizer
end

#more_parts?Boolean

Convenience method for checking on additional message parts.

Equivalent to calling Socket#getsockopt with XS::RCVMORE.

Warning: if the call to #getsockopt fails, this method will return false and swallow the error.

Examples:

message_parts = []
message = Message.new
rc = socket.recvmsg(message)
if XS::Util.resultcode_ok?(rc)
  message_parts << message
  while more_parts?
    message = Message.new
    rc = socket.recvmsg(message)
    message_parts.push(message) if resultcode_ok?(rc)
  end
end

Returns:

  • (Boolean)

    true if more message parts

  • false if not



175
176
177
178
179
# File 'lib/ffi-rxs/socket.rb', line 175

def more_parts?
  rc = getsockopt XS::RCVMORE, @more_parts_array

  Util.resultcode_ok?(rc) ? @more_parts_array.at(0) : false
end

#recv_multipart(list, routing_envelope, flag = 0) ⇒ Object

Should only be used for XREQ, XREP, DEALER and ROUTER type sockets. Takes a list for receiving the message body parts and a routing_envelope for receiving the message parts comprising the 0mq routing information.

Parameters:

  • list (Array)
  • routing_envelope
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 if successful

  • -1 if unsuccessful



457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/ffi-rxs/socket.rb', line 457

def recv_multipart list, routing_envelope, flag = 0
  parts = []
  rc = recvmsgs parts, flag

  if Util.resultcode_ok?(rc)
    routing = true
    parts.each do |part|
      if routing
        routing_envelope << part
        routing = part.size > 0
      else
        list << part
      end
    end
  end

  rc
end

#recv_string(string, flag = 0) ⇒ Object

Helper method to make a new #Message instance and convert its payload to a string.

With a -1 return code, the user must check XS.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.

Parameters:

  • string
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 when the message was successfully dequeued

  • -1 under two conditions 1. The message could not be dequeued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN



374
375
376
377
378
379
380
# File 'lib/ffi-rxs/socket.rb', line 374

def recv_string string, flag = 0
  message = @receiver_klass.new
  rc = recvmsg message, flag
  string.replace(message.copy_out_string) if Util.resultcode_ok?(rc)
  message.close
  rc
end

#recv_strings(list, flag = 0) ⇒ Object

Receive a multipart message as a list of strings.

Parameters:

  • list (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking. Any other flag will be removed.

Returns:

  • 0 if successful

  • -1 if unsuccessful



391
392
393
394
395
396
397
398
399
400
401
402
403
# File 'lib/ffi-rxs/socket.rb', line 391

def recv_strings list, flag = 0
  array = []
  rc = recvmsgs array, flag

  if Util.resultcode_ok?(rc)
    array.each do |message|
      list << message.copy_out_string
      message.close
    end
  end

  rc
end

#recvmsg(message, flag = 0) ⇒ Object

Dequeues a message from the underlying queue. By default, this is a blocking operation.

With a -1 return code, the user must check XS.errno to determine the cause.

The application code is responsible for handling the message object lifecycle when #recv returns an error code.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default) - blocking operation and XS::NonBlocking - non-blocking operation

Returns:

  • 0 when the message was successfully dequeued

  • -1 under two conditions 1. The message could not be dequeued 2. When flags is set with XS::NonBlocking and the socket returned EAGAIN



353
354
355
# File 'lib/ffi-rxs/socket.rb', line 353

def recvmsg message, flag = 0
  __recvmsg__(@socket, message.address, flag)
end

#recvmsgs(list, flag = 0) ⇒ Object

Receive a multipart message as an array of objects (by default these are instances of Message).

Parameters:

  • list (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking. Any other flag will be removed.

Returns:

  • 0 if successful

  • -1 if unsuccessful



415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/ffi-rxs/socket.rb', line 415

def recvmsgs list, flag = 0
  flag = NonBlocking if dontwait?(flag)

  message = @receiver_klass.new
  rc = recvmsg message, flag

  if Util.resultcode_ok?(rc)
    list << message

    # check rc *first*; necessary because the call to #more_parts? can reset
    # the xs_errno to a weird value, so the xs_errno that was set on the
    # call to #recv gets lost
    while Util.resultcode_ok?(rc) && more_parts?
      message = @receiver_klass.new
      rc = recvmsg message, flag

      if Util.resultcode_ok?(rc)
        list << message
      else
        message.close
        list.each { |msg| msg.close }
        list.clear
      end
    end
  else
    message.close
  end

  rc
end

#send_and_close(message, flag = 0) ⇒ Object

Sends a message. This will automatically close the message for both successful and failed sends.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default) and @XS::NonBlocking

Returns:

  • 0 when the message was successfully enqueued

  • -1 under two conditions 1. The message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



331
332
333
334
335
# File 'lib/ffi-rxs/socket.rb', line 331

def send_and_close message, flag = 0
  rc = sendmsg message, flag
  message.close
  rc
end

#send_string(string, flag = 0) ⇒ Object

Helper method to make a new #Message instance out of the string passed in for transmission.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default), XS::NonBlocking and XS::SNDMORE

Returns:

  • 0 when the message was successfully enqueued

  • -1 under two conditions 1. The message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



258
259
260
261
# File 'lib/ffi-rxs/socket.rb', line 258

def send_string string, flag = 0
  message = Message.new string
  send_and_close message, flag
end

#send_strings(parts, flag = 0) ⇒ Object

Send a sequence of strings as a multipart message out of the parts passed in for transmission. Every element of parts should be a String.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • parts (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 when the messages were successfully enqueued

  • -1 under two conditions 1. A message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



278
279
280
281
282
283
284
285
286
287
288
# File 'lib/ffi-rxs/socket.rb', line 278

def send_strings parts, flag = 0
  return -1 if !parts || parts.empty?
  flag = NonBlocking if dontwait?(flag)

  parts[0..-2].each do |part|
    rc = send_string part, (flag | XS::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  send_string parts[-1], flag
end

#sendmsg(message, flag = 0) ⇒ Object

Queues the message for transmission. Message is assumed to conform to the same public API as #Message.

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • message
  • flag (defaults to: 0)

    One of 0 (default) - blocking operation, XS::NonBlocking - non-blocking operation, XS::SNDMORE - this message is part of a multi-part message

Returns:

  • 0 when the message was successfully enqueued

  • -1 under two conditions 1. The message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN.



240
241
242
# File 'lib/ffi-rxs/socket.rb', line 240

def sendmsg message, flag = 0
  __sendmsg__(@socket, message.address, flag)
end

#sendmsgs(parts, flag = 0) ⇒ Object

Send a sequence of messages as a multipart message out of the parts passed in for transmission. Every element of parts should be a Message (or subclass).

With a -1 return code, the user must check XS.errno to determine the cause.

Parameters:

  • parts (Array)
  • flag (defaults to: 0)

    One of 0 (default) and XS::NonBlocking

Returns:

  • 0 when the messages were successfully enqueued

  • -1 under two conditions 1. A message could not be enqueued 2. When flag is set with XS::NonBlocking and the socket returned EAGAIN



305
306
307
308
309
310
311
312
313
314
315
# File 'lib/ffi-rxs/socket.rb', line 305

def sendmsgs parts, flag = 0
  return -1 if !parts || parts.empty?
  flag = NonBlocking if dontwait?(flag)

  parts[0..-2].each do |part|
    rc = sendmsg part, (flag | XS::SNDMORE)
    return rc unless Util.resultcode_ok?(rc)
  end

  sendmsg parts[-1], flag
end

#setsockopt(name, value, length = nil) ⇒ Object

Set the queue options on this socket

With a -1 return code, the user must check XS.errno to determine the cause.

Examples:

rc = socket.setsockopt(XS::LINGER, 1_000)
XS::Util.resultcode_ok?(rc) ? puts("succeeded") : puts("failed")

Parameters:

  • name (Constant)

    numeric values One of XS::AFFINITY, XS::RATE, XS::RECOVERY_IVL, XS::LINGER, XS::RECONNECT_IVL, XS::BACKLOG, XS::RECONNECT_IVL_MAX, XS::MAXMSGSIZE, XS::SNDHWM, XS::RCVHWM, XS::MULTICAST_HOPS, XS::RCVTIMEO, XS::SNDTIMEO, XS::IPV4ONLY, XS::KEEPALIVE, XS::SUBSCRIBE, XS::UNSUBSCRIBE, XS::IDENTITY, XS::SNDBUF, XS::RCVBUF

  • name (Constant)

    string values One of XS::IDENTITY, XS::SUBSCRIBE or XS::UNSUBSCRIBE

  • value

Returns:

  • 0 when the operation completed successfully

  • -1 when this operation fails



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/ffi-rxs/socket.rb', line 129

def setsockopt name, value, length = nil
  if 1 == @option_lookup[name]
    length = 8
    pointer = LibC.malloc length
    pointer.write_long_long value

  elsif 0 == @option_lookup[name]
    length = 4
    pointer = LibC.malloc length
    pointer.write_int value

  elsif 2 == @option_lookup[name]
    length ||= value.size

    # note: not checking errno for failed memory allocations :(
    pointer = LibC.malloc length
    pointer.write_string value
  end

  rc = LibXS.xs_setsockopt @socket, name, pointer, length
  LibC.free(pointer) unless pointer.nil? || pointer.null?
  rc
end