Class: CZTop::Message

Inherits:
Object
  • Object
show all
Extended by:
HasFFIDelegate::ClassMethods
Includes:
CZMQ::FFI, HasFFIDelegate
Defined in:
lib/cztop/message.rb,
lib/cztop/message/frames.rb

Overview

Represents a CZMQ::FFI::Zmsg.

Defined Under Namespace

Classes: FramesAccessor

Instance Attribute Summary

Attributes included from HasFFIDelegate

#ffi_delegate

Class Method Summary collapse

Instance Method Summary collapse

Methods included from HasFFIDelegate::ClassMethods

ffi_delegate, from_ffi_delegate

Methods included from HasFFIDelegate

#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #to_ptr

Constructor Details

#initialize(parts = nil) ⇒ Message

Returns a new instance of Message.

Parameters:

  • parts (String, Frame, Array<String>, Array<Frame>) (defaults to: nil)

    initial parts of the message



29
30
31
32
# File 'lib/cztop/message.rb', line 29

def initialize(parts = nil)
  attach_ffi_delegate(Zmsg.new)
  Array(parts).each { |part| self << part } if parts
end

Class Method Details

.coerce(msg) ⇒ Message

Coerces an object into a CZTop::Message.

Parameters:

Returns:

Raises:

  • (ArgumentError)

    if it can’t be coerced



15
16
17
18
19
20
21
22
23
24
# File 'lib/cztop/message.rb', line 15

def self.coerce(msg)
  case msg
  when Message
    msg
  when String, Frame, Array
    new(msg)
  else
    raise ArgumentError, format('cannot coerce message: %p', msg)
  end
end

.receive_from(source) ⇒ Message

Receive a CZTop::Message from a Socket or Actor.

Parameters:

Returns:

  • (Message)

    the newly received message

Raises:

  • (IO::EAGAINWaitReadable)

    if the receive timeout has been reached (see ZsockOptions::OptionsAccessor#rcvtimeo=)

  • (Interrupt)

    if interrupted while waiting for a message

  • (SystemCallError)

    for any other error code set after zmsg_recv returns with failure. Please report as bug.



85
86
87
88
89
90
91
92
93
94
# File 'lib/cztop/message.rb', line 85

def self.receive_from(source)
  source.wait_readable

  delegate = Zmsg.recv(source)
  return from_ffi_delegate(delegate) unless delegate.null?

  HasFFIDelegate.raise_zmq_err
rescue Errno::EAGAIN
  raise IO::EAGAINWaitReadable
end

Instance Method Details

#<<(frame) ⇒ self

Note:

If you provide a Frame, do NOT use that frame afterwards anymore, as its native counterpart will have been destroyed.

Append a frame to this message.

Parameters:

  • frame (String, Frame)

    what to append

Returns:

  • (self)

    so it can be chained

Raises:

  • (ArgumentError)

    if frame has an invalid type

  • (SystemCallError)

    if this fails



104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/cztop/message.rb', line 104

def <<(frame)
  case frame
  when String
    # NOTE: can't use addstr because the data might be binary
    mem = FFI::MemoryPointer.from_string(frame)
    rc  = ffi_delegate.addmem(mem, mem.size - 1) # without NULL byte
  when Frame
    rc = ffi_delegate.append(frame.ffi_delegate)
  else
    raise ArgumentError, format('invalid frame: %p', frame)
  end
  raise_zmq_err unless rc.zero?
  self
end

#[](frame_index) ⇒ String?

Return a frame’s content.

Returns:

  • (String)

    the frame’s content, if it exists

  • (nil)

    if frame doesn’t exist at given index



191
192
193
194
# File 'lib/cztop/message.rb', line 191

def [](frame_index)
  frame = frames[frame_index] or return nil
  frame.to_s
end

#content_sizeInteger

Returns size of this message in bytes.

Returns:

  • (Integer)

    size of this message in bytes

See Also:



155
156
157
# File 'lib/cztop/message.rb', line 155

def content_size
  ffi_delegate.content_size
end

#empty?Boolean

Returns if this message is empty or not (no frames or every frame has length zero).

Returns:

  • (Boolean)

    if this message is empty or not (no frames or every frame has length zero)



36
37
38
# File 'lib/cztop/message.rb', line 36

def empty?
  content_size.zero?
end

#framesFramesAccessor

Access to this CZTop::Message‘s Frames.

Returns:



15
16
17
# File 'lib/cztop/message/frames.rb', line 15

def frames
  FramesAccessor.new(self)
end

#inspectString

Inspects this CZTop::Message.

Returns:

  • (String)

    shows class, number of frames, content size, and content (only if it’s up to 200 bytes)



182
183
184
185
# File 'lib/cztop/message.rb', line 182

def inspect
  format('#<%s:0x%x frames=%i content_size=%i content=%s>', self.class, to_ptr.address, size, content_size,
         content_size <= 500 ? to_a.inspect : '[...]')
end

#popString?

Removes first part from message and returns it as a string.

Returns:

  • (String, nil)

    first part, if any, or nil



144
145
146
147
148
149
150
# File 'lib/cztop/message.rb', line 144

def pop
  # NOTE: can't use popstr because the data might be binary
  ptr = ffi_delegate.pop
  return nil if ptr.null?

  Frame.from_ffi_delegate(ptr).to_s
end

#prepend(frame) ⇒ void

Note:

If you provide a Frame, do NOT use that frame afterwards anymore, as its native counterpart will have been destroyed.

This method returns an undefined value.

Prepend a frame to this message.

Parameters:

  • frame (String, Frame)

    what to prepend

Raises:

  • (ArgumentError)

    if frame has an invalid type

  • (SystemCallError)

    if this fails



127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/cztop/message.rb', line 127

def prepend(frame)
  case frame
  when String
    # NOTE: can't use pushstr because the data might be binary
    mem = FFI::MemoryPointer.from_string(frame)
    rc  = ffi_delegate.pushmem(mem, mem.size - 1) # without NULL byte
  when Frame
    rc = ffi_delegate.prepend(frame.ffi_delegate)
  else
    raise ArgumentError, format('invalid frame: %p', frame)
  end
  raise_zmq_err unless rc.zero?
end

#routing_idInteger

Note:

This only set when the frame came from a Socket::SERVER socket.

Gets the routing ID.

Returns:

  • (Integer)

    the routing ID, or 0 if unset



200
# File 'lib/cztop/message.rb', line 200

ffi_delegate :routing_id

#routing_id=(new_routing_id) ⇒ new_routing_id

Note:

This is used when the message is sent to a Socket::SERVER socket.

Sets a new routing ID.

Parameters:

  • new_routing_id (Integer)

    new routing ID

Returns:

  • (new_routing_id)

Raises:

  • (ArgumentError)

    if new routing ID is not an Integer

  • (RangeError)

    if new routing ID is out of uint32_t range



209
210
211
212
213
214
215
216
217
# File 'lib/cztop/message.rb', line 209

def routing_id=(new_routing_id)
  raise ArgumentError unless new_routing_id.is_a? Integer

  # need to raise manually, as FFI lacks this feature.
  # @see https://github.com/ffi/ffi/issues/473
  raise RangeError if new_routing_id.negative?

  ffi_delegate.set_routing_id(new_routing_id)
end

#send_to(destination) ⇒ void

Note:

Do NOT use this CZTop::Message anymore afterwards. Its native counterpart will have been destroyed.

This method returns an undefined value.

Send CZTop::Message to a Socket or Actor.

Parameters:

  • destination (Socket, Actor)

    where to send this message to

Raises:

  • (IO::EAGAINWaitWritable)

    if the send timeout has been reached (see ZsockOptions::OptionsAccessor#sndtimeo=)

  • (SocketError)

    if the message can’t be routed to the destination (either if ZMQ_ROUTER_MANDATORY flag is set on a Socket::ROUTER socket and the peer isn’t connected or its SNDHWM is reached (see ZsockOptions::OptionsAccessor#router_mandatory=, or if it’s a Socket::SERVER socket and there’s no connected CLIENT corresponding to the given routing ID)

  • (ArgumentError)

    if the message is invalid, e.g. when trying to send a message with a no parts, or a multi-part message over a CLIENT/SERVER socket

  • (SystemCallError)

    for any other error code set after zmsg_send returns with failure. Please report as bug.



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/cztop/message.rb', line 63

def send_to(destination)
  fail ArgumentError, "message has no frames" if size.zero?

  destination.wait_writable

  rc = Zmsg.send(ffi_delegate, destination)
  return if rc.zero?

  raise_zmq_err
rescue Errno::EAGAIN
  raise IO::EAGAINWaitWritable
end

#sizeInteger

Returns number of frames.

Returns:

  • (Integer)

    number of frames

See Also:



8
9
10
# File 'lib/cztop/message/frames.rb', line 8

def size
  ffi_delegate.size
end

#to_aArray<String>

Note:

It’ll read all frames in the message and turn them into Ruby strings. This can be a problem if the message is huge/has huge frames.

Returns all frames as strings in an array. This is useful if for quick inspection of the message.

Returns:

  • (Array<String>)

    all frames



165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/cztop/message.rb', line 165

def to_a
  ffi_delegate = ffi_delegate()
  frame        = ffi_delegate.first
  return [] if frame.null?

  arr          = [frame.data.read_bytes(frame.size)]
  while (frame = ffi_delegate.next) && !frame.null?
    arr << frame.data.read_bytes(frame.size)
  end

  arr
end