Module: ZMQMachine::Socket::Base

Included in:
Pair, Pub, Rep, Req, Sub, XRep, XReq
Defined in:
lib/zm/sockets/base.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#kindObject (readonly)

Returns the value of attribute kind.



42
43
44
# File 'lib/zm/sockets/base.rb', line 42

def kind
  @kind
end

#poll_optionsObject (readonly)

Returns the value of attribute poll_options.



43
44
45
# File 'lib/zm/sockets/base.rb', line 43

def poll_options
  @poll_options
end

#raw_socketObject (readonly)

Returns the value of attribute raw_socket.



42
43
44
# File 'lib/zm/sockets/base.rb', line 42

def raw_socket
  @raw_socket
end

Instance Method Details

#attach(handler) ⇒ Object

Call the handler’s #on_attach method and pass itself so the handler may complete its setup.

The #on_attach method is passed a single argument named socket. The method should probably #bind or #connect to an address and potentially schedule (via timer) an operation or begin sending messages immediately.

Raises:

  • (ArgumentError)


64
65
66
67
# File 'lib/zm/sockets/base.rb', line 64

def attach handler
  raise ArgumentError, "Handler must provide an 'on_attach' method" unless handler.respond_to? :on_attach
  handler.on_attach self
end

#bind(address) ⇒ Object

Creates a 0mq socket endpoint for the transport given in the address. Other 0mq sockets may then #connect to this bound endpoint.



73
74
75
76
77
78
79
80
81
82
# File 'lib/zm/sockets/base.rb', line 73

def bind address
  begin
    @bindings << address
    @raw_socket.bind address.to_s
    true
  rescue ZMQ::ZeroMQError
    @bindings.pop
    false
  end
end

#connect(address) ⇒ Object

Connect this 0mq socket to the 0mq socket bound to the endpoint described by the address.



87
88
89
90
91
92
93
94
95
96
# File 'lib/zm/sockets/base.rb', line 87

def connect address
  begin
    @connections << address
    @raw_socket.connect address.to_s
    true
  rescue ZMQ::ZeroMQError
    @connections.pop
    false
  end
end

#identityObject

Retrieve the IDENTITY value assigned to this socket.



153
# File 'lib/zm/sockets/base.rb', line 153

def identity() @raw_socket.identity; end

#identity=(value) ⇒ Object

Assign a custom IDENTITY value to this socket. Limit is 255 bytes and must be greater than 0 bytes.



158
# File 'lib/zm/sockets/base.rb', line 158

def identity=(value) @raw_socket.identity = value; end

#initialize(context, handler) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/zm/sockets/base.rb', line 45

def initialize context, handler
  @state = :init
  @context = context
  @bindings = []
  @connections = []

  @handler = handler
  @raw_socket = allocate_socket @context
  attach @handler
end

#inspectObject



196
197
198
# File 'lib/zm/sockets/base.rb', line 196

def inspect
  "kind [#{@kind}] poll options [#{@poll_options}] state [#{@state}]"
end

#resume_readObject

Used by the reactor. Never called by user code.

FIXME: need to rework all of this rc stuff. The underlying lib returns nil when a NOBLOCK socket gets EAGAIN. It returns true when a message was successfully dequeued. The use of rc here is really ugly and wrong.



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/zm/sockets/base.rb', line 166

def resume_read
  rc = 0
  
  # loop and deliver all messages until the socket returns EAGAIN
  while 0 == rc
    messages = []
    rc = read_message_part messages
    #puts "resume_read: rc1 [#{rc}], more_parts? [#{@raw_socket.more_parts?}]"

    while 0 == rc && @raw_socket.more_parts?
      #puts "get next part"
      rc = read_message_part messages
      #puts "resume_read: rc2 [#{rc}]"
    end
    #puts "no more parts, ready to deliver"

    # only deliver the messages when rc is 0; otherwise, we
    # may have gotten EAGAIN and no message was read;
    # don't deliver empty messages
    deliver messages, rc if 0 == rc
  end
end

#resume_writeObject

Used by the reactor. Never called by user code.



191
192
193
194
# File 'lib/zm/sockets/base.rb', line 191

def resume_write
  @state = :ready
  @handler.on_writable self
end

#send_message(message, multipart = false) ⇒ Object

Called to send a ZMQ::Message that was populated with data.

Returns true on success, false otherwise.

May raise a ZMQ::SocketError.



104
105
106
107
108
109
110
111
# File 'lib/zm/sockets/base.rb', line 104

def send_message message, multipart = false
  begin
    queued = @raw_socket.send message, ZMQ::NOBLOCK | (multipart ? ZMQ::SNDMORE : 0)
  rescue ZMQ::ZeroMQError => e
    queued = false
  end
  queued
end

#send_message_string(message, multipart = false) ⇒ Object

Convenience method to send a string on the socket. It handles the creation of a ZMQ::Message and populates it appropriately.

Returns true on success, false otherwise.

May raise a ZMQ::SocketError.



120
121
122
123
# File 'lib/zm/sockets/base.rb', line 120

def send_message_string message, multipart = false
  queued = @raw_socket.send_string message, ZMQ::NOBLOCK | (multipart ? ZMQ::SNDMORE : 0)
  queued
end

#send_messages(messages) ⇒ Object

Convenience method for sending a multi-part message. The messages argument must respond to :size, :at and :last (like an Array).

May raise a ZMQ::SocketError.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/zm/sockets/base.rb', line 131

def send_messages messages
  rc = true
  i = 0
  size = messages.size

  # loop through all messages but the last
  while rc && size > 1 && i < size - 1 do
    rc = send_message messages.at(i), true
    i += 1
  end
  
  # FIXME: bug; if any of the message parts fail (rc != 0) we don't see that here; the
  # #send_message function should capture exceptions and turn them into integers for bubbling

  # send the last message without the multipart arg to flush
  # the message to the 0mq queue
  rc = send_message messages.last if rc && size > 0
  rc
end