Class: CZTop::Message

Inherits:
Object
  • Object
show all
Extended by:
HasFFIDelegate::ClassMethods
Includes:
CZMQ::FFI, HasFFIDelegate
Defined in:
lib/cztop/message.rb,
lib/cztop/message/frames.rb

Overview

Represents a CZMQ::FFI::Zmsg.

Defined Under Namespace

Classes: FramesAccessor

Instance Attribute Summary

Attributes included from HasFFIDelegate

#ffi_delegate

Class Method Summary collapse

Instance Method Summary collapse

Methods included from HasFFIDelegate::ClassMethods

ffi_delegate, from_ffi_delegate

Methods included from HasFFIDelegate

#attach_ffi_delegate, #from_ffi_delegate, raise_zmq_err, #raise_zmq_err, #to_ptr

Constructor Details

#initialize(parts = nil) ⇒ Message

Returns a new instance of Message.

Parameters:

  • parts (String, Frame, Array<String>, Array<Frame>) (defaults to: nil)

    initial parts of the message



25
26
27
28
# File 'lib/cztop/message.rb', line 25

def initialize(parts = nil)
  attach_ffi_delegate(Zmsg.new)
  Array(parts).each { |part| self << part } if parts
end

Class Method Details

.coerce(msg) ⇒ Message

Coerces an object into a CZTop::Message.

Parameters:

Returns:

Raises:

  • (ArgumentError)

    if it can’t be coerced



12
13
14
15
16
17
18
19
20
21
# File 'lib/cztop/message.rb', line 12

def self.coerce(msg)
  case msg
  when Message
    return msg
  when String, Frame, Array
    return new(msg)
  else
    raise ArgumentError, "cannot coerce message: %p" % msg
  end
end

.receive_from(source) ⇒ Message

Receive a CZTop::Message from a Socket or Actor.

Parameters:

Returns:

  • (Message)

    the newly received message

Raises:

  • (IO::EAGAINWaitReadable)

    if the receive timeout has been reached (see ZsockOptions::OptionsAccessor#rcvtimeo=)

  • (Interrupt)

    if interrupted while waiting for a message

  • (SystemCallError)

    for any other error code set after zmsg_recv returns with failure. Please report as bug.



73
74
75
76
77
78
79
# File 'lib/cztop/message.rb', line 73

def self.receive_from(source)
  delegate = Zmsg.recv(source)
  return from_ffi_delegate(delegate) unless delegate.null?
  HasFFIDelegate.raise_zmq_err
rescue Errno::EAGAIN
  raise IO::EAGAINWaitReadable
end

Instance Method Details

#<<(frame) ⇒ self

Note:

If you provide a Frame, do NOT use that frame afterwards anymore, as its native counterpart will have been destroyed.

Append a frame to this message.

Parameters:

  • frame (String, Frame)

    what to append

Returns:

  • (self)

    so it can be chained

Raises:

  • (ArgumentError)

    if frame has an invalid type

  • (SystemCallError)

    if this fails



88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/cztop/message.rb', line 88

def <<(frame)
  case frame
  when String
    # NOTE: can't use addstr because the data might be binary
    mem = FFI::MemoryPointer.from_string(frame)
    rc = ffi_delegate.addmem(mem, mem.size - 1) # without NULL byte
  when Frame
    rc = ffi_delegate.append(frame.ffi_delegate)
  else
    raise ArgumentError, "invalid frame: %p" % frame
  end
  raise_zmq_err unless rc == 0
  self
end

#[](frame_index) ⇒ String?

Return a frame’s content.

Returns:

  • (String)

    the frame’s content, if it exists

  • (nil)

    if frame doesn’t exist at given index



173
174
175
176
# File 'lib/cztop/message.rb', line 173

def [](frame_index)
  frame = frames[frame_index] or return nil
  frame.to_s
end

#content_sizeInteger

Returns size of this message in bytes.

Returns:

  • (Integer)

    size of this message in bytes

See Also:



135
136
137
# File 'lib/cztop/message.rb', line 135

def content_size
  ffi_delegate.content_size
end

#empty?Boolean

Returns if this message is empty or not.

Returns:

  • (Boolean)

    if this message is empty or not



31
32
33
# File 'lib/cztop/message.rb', line 31

def empty?
  content_size.zero?
end

#framesFramesAccessor

Access to this CZTop::Message‘s Frames.

Returns:



12
13
14
# File 'lib/cztop/message/frames.rb', line 12

def frames
  FramesAccessor.new(self)
end

#inspectString

Inspects this CZTop::Message.

Returns:

  • (String)

    shows class, number of frames, content size, and content (only if it’s up to 200 bytes)



160
161
162
163
164
165
166
167
168
# File 'lib/cztop/message.rb', line 160

def inspect
  "#<%s:0x%x frames=%i content_size=%i content=%s>" % [
    self.class,
    to_ptr.address,
    size,
    content_size,
    content_size <= 500 ? to_a.inspect : "[...]"
  ]
end

#popString?

Removes first part from message and returns it as a string.

Returns:

  • (String, nil)

    first part, if any, or nil



126
127
128
129
130
131
# File 'lib/cztop/message.rb', line 126

def pop
  # NOTE: can't use popstr because the data might be binary
  ptr = ffi_delegate.pop
  return nil if ptr.null?
  Frame.from_ffi_delegate(ptr).to_s
end

#prepend(frame) ⇒ void

Note:

If you provide a Frame, do NOT use that frame afterwards anymore, as its native counterpart will have been destroyed.

This method returns an undefined value.

Prepend a frame to this message.

Parameters:

  • frame (String, Frame)

    what to prepend

Raises:

  • (ArgumentError)

    if frame has an invalid type

  • (SystemCallError)

    if this fails



110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/cztop/message.rb', line 110

def prepend(frame)
  case frame
  when String
    # NOTE: can't use pushstr because the data might be binary
    mem = FFI::MemoryPointer.from_string(frame)
    rc = ffi_delegate.pushmem(mem, mem.size - 1) # without NULL byte
  when Frame
    rc = ffi_delegate.prepend(frame.ffi_delegate)
  else
    raise ArgumentError, "invalid frame: %p" % frame
  end
  raise_zmq_err unless rc == 0
end

#routing_idInteger

Note:

This only set when the frame came from a Socket::SERVER socket.

Gets the routing ID.

Returns:

  • (Integer)

    the routing ID, or 0 if unset



182
# File 'lib/cztop/message.rb', line 182

ffi_delegate :routing_id

#routing_id=(new_routing_id) ⇒ new_routing_id

Note:

This is used when the message is sent to a Socket::SERVER socket.

Sets a new routing ID.

Parameters:

  • new_routing_id (Integer)

    new routing ID

Returns:

  • (new_routing_id)

Raises:

  • (ArgumentError)

    if new routing ID is not an Integer

  • (RangeError)

    if new routing ID is out of uint32_t range



191
192
193
194
195
196
197
198
# File 'lib/cztop/message.rb', line 191

def routing_id=(new_routing_id)
  raise ArgumentError unless new_routing_id.is_a? Integer

  # need to raise manually, as FFI lacks this feature.
  # @see https://github.com/ffi/ffi/issues/473
  raise RangeError if new_routing_id < 0
  ffi_delegate.set_routing_id(new_routing_id)
end

#send_to(destination) ⇒ void

Note:

Do NOT use this CZTop::Message anymore afterwards. Its native counterpart will have been destroyed.

This method returns an undefined value.

Send CZTop::Message to a Socket or Actor.

Parameters:

  • destination (Socket, Actor)

    where to send this message to

Raises:

  • (IO::EAGAINWaitWritable)

    if the send timeout has been reached (see ZsockOptions::OptionsAccessor#sndtimeo=)

  • (SocketError)

    if the message can’t be routed to the destination (either if ROUTER_MANDATORY flag is set on a Socket::ROUTER socket and the peer isn’t connected or its SNDHWM is reached (see ZsockOptions::OptionsAccessor#router_mandatory=, or if it’s a Socket::SERVER socket and there’s no connected CLIENT corresponding to the given routing ID)

  • (ArgumentError)

    if the message is invalid, e.g. when trying to send a multi-part message over a CLIENT/SERVER socket

  • (SystemCallError)

    for any other error code set after zmsg_send returns with failure. Please report as bug.



57
58
59
60
61
62
63
# File 'lib/cztop/message.rb', line 57

def send_to(destination)
  rc = Zmsg.send(ffi_delegate, destination)
  return if rc == 0
  raise_zmq_err
rescue Errno::EAGAIN
  raise IO::EAGAINWaitWritable
end

#sizeInteger

Returns number of frames.

Returns:

  • (Integer)

    number of frames

See Also:



6
7
8
# File 'lib/cztop/message/frames.rb', line 6

def size
  frames.count
end

#to_aArray<String>

Note:

It’ll read all frames in the message and turn them into Ruby strings. This can be a problem if the message is huge/has huge frames.

Returns all frames as strings in an array. This is useful if for quick inspection of the message.

Returns:

  • (Array<String>)

    all frames



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/cztop/message.rb', line 144

def to_a
  ffi_delegate = ffi_delegate()
  frame = ffi_delegate.first
  return [] if frame.null?

  arr = [ frame.data.read_bytes(frame.size) ]
  while frame = ffi_delegate.next and not frame.null?
    arr << frame.data.read_bytes(frame.size)
  end

  return arr
end