Module: ZMQMachine::Socket::Base
Instance Attribute Summary collapse
-
#kind ⇒ Object
readonly
Returns the value of attribute kind.
-
#poll_options ⇒ Object
readonly
Returns the value of attribute poll_options.
-
#raw_socket ⇒ Object
readonly
Returns the value of attribute raw_socket.
Instance Method Summary collapse
-
#attach(handler) ⇒ Object
Call the handler’s #on_attach method and pass itself so the handler may complete its setup.
-
#bind(address) ⇒ Object
Creates a 0mq socket endpoint for the transport given in the
address
. -
#connect(address) ⇒ Object
Connect this 0mq socket to the 0mq socket bound to the endpoint described by the
address
. -
#identity ⇒ Object
Retrieve the IDENTITY value assigned to this socket.
-
#identity=(value) ⇒ Object
Assign a custom IDENTITY value to this socket.
- #initialize(context, handler) ⇒ Object
- #inspect ⇒ Object
-
#resume_read ⇒ Object
Used by the reactor.
-
#resume_write ⇒ Object
Used by the reactor.
-
#send_message(message, multipart = false) ⇒ Object
Called to send a ZMQ::Message that was populated with data.
-
#send_message_string(message, multipart = false) ⇒ Object
Convenience method to send a string on the socket.
-
#send_messages(messages) ⇒ Object
Convenience method for sending a multi-part message.
Instance Attribute Details
#kind ⇒ Object (readonly)
Returns the value of attribute kind.
42 43 44 |
# File 'lib/zm/sockets/base.rb', line 42 def kind @kind end |
#poll_options ⇒ Object (readonly)
Returns the value of attribute poll_options.
43 44 45 |
# File 'lib/zm/sockets/base.rb', line 43 def @poll_options end |
#raw_socket ⇒ Object (readonly)
Returns the value of attribute raw_socket.
42 43 44 |
# File 'lib/zm/sockets/base.rb', line 42 def raw_socket @raw_socket end |
Instance Method Details
#attach(handler) ⇒ Object
Call the handler’s #on_attach method and pass itself so the handler may complete its setup.
The #on_attach method is passed a single argument named socket
. The method should probably #bind or #connect to an address and potentially schedule (via timer) an operation or begin sending messages immediately.
64 65 66 67 |
# File 'lib/zm/sockets/base.rb', line 64 def attach handler raise ArgumentError, "Handler must provide an 'on_attach' method" unless handler.respond_to? :on_attach handler.on_attach self end |
#bind(address) ⇒ Object
Creates a 0mq socket endpoint for the transport given in the address
. Other 0mq sockets may then #connect to this bound endpoint.
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/zm/sockets/base.rb', line 73 def bind address begin @bindings << address @raw_socket.bind address.to_s true rescue ZMQ::ZeroMQError @bindings.pop false end end |
#connect(address) ⇒ Object
Connect this 0mq socket to the 0mq socket bound to the endpoint described by the address
.
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/zm/sockets/base.rb', line 87 def connect address begin @connections << address @raw_socket.connect address.to_s true rescue ZMQ::ZeroMQError @connections.pop false end end |
#identity ⇒ Object
Retrieve the IDENTITY value assigned to this socket.
153 |
# File 'lib/zm/sockets/base.rb', line 153 def identity() @raw_socket.identity; end |
#identity=(value) ⇒ Object
Assign a custom IDENTITY value to this socket. Limit is 255 bytes and must be greater than 0 bytes.
158 |
# File 'lib/zm/sockets/base.rb', line 158 def identity=(value) @raw_socket.identity = value; end |
#initialize(context, handler) ⇒ Object
45 46 47 48 49 50 51 52 53 54 |
# File 'lib/zm/sockets/base.rb', line 45 def initialize context, handler @state = :init @context = context @bindings = [] @connections = [] @handler = handler @raw_socket = allocate_socket @context attach @handler end |
#inspect ⇒ Object
196 197 198 |
# File 'lib/zm/sockets/base.rb', line 196 def inspect "kind [#{@kind}] poll options [#{@poll_options}] state [#{@state}]" end |
#resume_read ⇒ Object
Used by the reactor. Never called by user code.
FIXME: need to rework all of this rc
stuff. The underlying lib returns nil when a NOBLOCK socket gets EAGAIN. It returns true when a message was successfully dequeued. The use of rc here is really ugly and wrong.
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/zm/sockets/base.rb', line 166 def resume_read rc = 0 # loop and deliver all messages until the socket returns EAGAIN while 0 == rc = [] rc = #puts "resume_read: rc1 [#{rc}], more_parts? [#{@raw_socket.more_parts?}]" while 0 == rc && @raw_socket.more_parts? #puts "get next part" rc = #puts "resume_read: rc2 [#{rc}]" end #puts "no more parts, ready to deliver" # only deliver the messages when rc is 0; otherwise, we # may have gotten EAGAIN and no message was read; # don't deliver empty messages deliver , rc if 0 == rc end end |
#resume_write ⇒ Object
Used by the reactor. Never called by user code.
191 192 193 194 |
# File 'lib/zm/sockets/base.rb', line 191 def resume_write @state = :ready @handler.on_writable self end |
#send_message(message, multipart = false) ⇒ Object
Called to send a ZMQ::Message that was populated with data.
Returns true on success, false otherwise.
May raise a ZMQ::SocketError.
104 105 106 107 108 109 110 111 |
# File 'lib/zm/sockets/base.rb', line 104 def , multipart = false begin queued = @raw_socket.send , ZMQ::NOBLOCK | (multipart ? ZMQ::SNDMORE : 0) rescue ZMQ::ZeroMQError => e queued = false end queued end |
#send_message_string(message, multipart = false) ⇒ Object
Convenience method to send a string on the socket. It handles the creation of a ZMQ::Message and populates it appropriately.
Returns true on success, false otherwise.
May raise a ZMQ::SocketError.
120 121 122 123 |
# File 'lib/zm/sockets/base.rb', line 120 def , multipart = false queued = @raw_socket.send_string , ZMQ::NOBLOCK | (multipart ? ZMQ::SNDMORE : 0) queued end |
#send_messages(messages) ⇒ Object
Convenience method for sending a multi-part message. The messages
argument must respond to :size, :at and :last (like an Array).
May raise a ZMQ::SocketError.
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/zm/sockets/base.rb', line 131 def rc = true i = 0 size = .size # loop through all messages but the last while rc && size > 1 && i < size - 1 do rc = .at(i), true i += 1 end # FIXME: bug; if any of the message parts fail (rc != 0) we don't see that here; the # #send_message function should capture exceptions and turn them into integers for bubbling # send the last message without the multipart arg to flush # the message to the 0mq queue rc = .last if rc && size > 0 rc end |