Class: OnStomp::Connections::Base
- Inherits:
-
Object
- Object
- OnStomp::Connections::Base
- Includes:
- Interfaces::ConnectionEvents
- Defined in:
- lib/onstomp/connections/base.rb
Overview
Common behavior for all connections.
Defined Under Namespace
Modules: BlockingRead, BlockingWrite, NonblockingRead, NonblockingWrite
Constant Summary collapse
- MAX_BYTES_PER_WRITE =
The approximate maximum number of bytes to write per call to #io_process_write.
1024 * 8
- MAX_BYTES_PER_READ =
The maximum number of bytes to read per call to #io_process_read
1024 * 4
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#last_received_at ⇒ Object
readonly
Returns the value of attribute last_received_at.
-
#last_transmitted_at ⇒ Object
readonly
Returns the value of attribute last_transmitted_at.
-
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#version ⇒ Object
readonly
Returns the value of attribute version.
-
#write_timeout ⇒ Object
Returns the value of attribute write_timeout.
Instance Method Summary collapse
-
#close(blocking = false) ⇒ Object
Closes the #socket.
-
#configure(connected, con_cbs) ⇒ Object
Performs any necessary configuration of the connection from the CONNECTED frame sent by the broker and a
Hash
of pending callbacks. -
#connect(client, *headers) ⇒ Object
Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame.
-
#connected? ⇒ true, false
Returns true if the socket has not been closed, false otherwise.
-
#duration_since_received ⇒ Fixnum?
Number of milliseconds since data was last received from the broker or
nil
if no data has been received when the method is called. -
#duration_since_transmitted ⇒ Fixnum?
Number of milliseconds since data was last transmitted to the broker or
nil
if no data has been transmitted when the method is called. -
#flush_write_buffer ⇒ Object
Flushes the write buffer by invoking #io_process_write until the buffer is empty.
- #initialize(socket, client) ⇒ Base constructor
-
#io_process(&cb) ⇒ Object
Makes a single call to #io_process_write and a single call to #io_process_read.
-
#io_process_read(connecting = false) ⇒ Object
Reads serialized frame data from the socket if we're connected and and the socket is ready for reading.
-
#io_process_write ⇒ Object
Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing.
-
#method_missing(meth, *args, &block) ⇒ Object
Checks if the missing method ends with '_frame', and if so raises a UnsupportedCommandError exception.
-
#push_write_buffer(data, frame) ⇒ Object
Adds data and frame pair to the end of the write buffer.
-
#shift_write_buffer ⇒ Object
Removes the first data and frame pair from the write buffer.
-
#unshift_write_buffer(data, frame) ⇒ Object
Adds the remains of data and frame pair to the head of the write buffer.
-
#write_frame_nonblock(frame) ⇒ Object
Serializes the given frame and adds the data to the connections internal write buffer.
Methods included from Interfaces::ConnectionEvents
#install_bindings_from_client, #trigger_connection_event
Methods included from Interfaces::EventManager
#bind_event, #event_callbacks, included, #trigger_event
Constructor Details
#initialize(socket, client) ⇒ Base
22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/onstomp/connections/base.rb', line 22 def initialize socket, client @socket = socket @write_mutex = Mutex.new @closing = false @write_buffer = [] @read_buffer = [] @client = client @connection_up = false self.read_timeout = 120 self.write_timeout = nil setup_non_blocking_methods end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(meth, *args, &block) ⇒ Object
Checks if the missing method ends with '_frame', and if so raises a UnsupportedCommandError exception.
121 122 123 124 125 126 127 |
# File 'lib/onstomp/connections/base.rb', line 121 def method_missing meth, *args, &block if meth.to_s =~ /^(.*)_frame$/ raise OnStomp::UnsupportedCommandError, $1.upcase else super end end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def client @client end |
#last_received_at ⇒ Object (readonly)
Returns the value of attribute last_received_at.
7 8 9 |
# File 'lib/onstomp/connections/base.rb', line 7 def last_received_at @last_received_at end |
#last_transmitted_at ⇒ Object (readonly)
Returns the value of attribute last_transmitted_at.
7 8 9 |
# File 'lib/onstomp/connections/base.rb', line 7 def last_transmitted_at @last_transmitted_at end |
#read_timeout ⇒ Object
Returns the value of attribute read_timeout.
8 9 10 |
# File 'lib/onstomp/connections/base.rb', line 8 def read_timeout @read_timeout end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def socket @socket end |
#version ⇒ Object (readonly)
Returns the value of attribute version.
6 7 8 |
# File 'lib/onstomp/connections/base.rb', line 6 def version @version end |
#write_timeout ⇒ Object
Returns the value of attribute write_timeout.
8 9 10 |
# File 'lib/onstomp/connections/base.rb', line 8 def write_timeout @write_timeout end |
Instance Method Details
#close(blocking = false) ⇒ Object
Closes the #socket. If blocking
is true, the socket will be closed
immediately, otherwies the socket will remain open until #io_process_write
has finished writing all of its buffered data. Once this method has been
invoked, #write_frame_nonblock will not enqueue any additional frames
for writing.
85 86 87 88 89 90 91 |
# File 'lib/onstomp/connections/base.rb', line 85 def close blocking=false @write_mutex.synchronize { @closing = true } if blocking io_process_write until @write_buffer.empty? socket.close end end |
#configure(connected, con_cbs) ⇒ Object
Performs any necessary configuration of the connection from the CONNECTED
frame sent by the broker and a Hash
of pending callbacks. This method
is called after the protocol negotiation has taken place between client
and broker, and the connection that receives it will be the connection
used by the client for the duration of the session.
68 69 70 71 |
# File 'lib/onstomp/connections/base.rb', line 68 def configure connected, con_cbs @version = connected.header?(:version) ? connected[:version] : '1.0' install_bindings_from_client con_cbs end |
#connect(client, *headers) ⇒ Object
Exchanges the CONNECT/CONNECTED frame handshake with the broker and returns the version detected along with the received CONNECTED frame. The supplied list of headers will be merged into the CONNECT frame sent to the broker.
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/onstomp/connections/base.rb', line 98 def connect client, *headers # I really don't care for this. A core part of the CONNECT/CONNECTED # exchange can only be accomplished through subclasses. write_frame_nonblock connect_frame(*headers) client_con = nil until client_con io_process_write { |f| client_con ||= f } end update_last_received broker_con = nil until broker_con io_process_read(true) { |f| broker_con ||= f } end raise OnStomp::ConnectFailedError if broker_con.command != 'CONNECTED' vers = broker_con.header?(:version) ? broker_con[:version] : '1.0' raise OnStomp::UnsupportedProtocolVersionError, vers unless client.versions.include?(vers) @connection_up = true [ vers, broker_con ] end |
#connected? ⇒ true, false
Returns true if the socket has not been closed, false otherwise.
75 76 77 |
# File 'lib/onstomp/connections/base.rb', line 75 def connected? !socket.closed? end |
#duration_since_received ⇒ Fixnum?
Number of milliseconds since data was last received from the broker or
nil
if no data has been received when the method is called.
139 140 141 |
# File 'lib/onstomp/connections/base.rb', line 139 def duration_since_received last_received_at && ((Time.now.to_f - last_received_at) * 1000) end |
#duration_since_transmitted ⇒ Fixnum?
Number of milliseconds since data was last transmitted to the broker or
nil
if no data has been transmitted when the method is called.
132 133 134 |
# File 'lib/onstomp/connections/base.rb', line 132 def duration_since_transmitted last_transmitted_at && ((Time.now.to_f - last_transmitted_at) * 1000) end |
#flush_write_buffer ⇒ Object
Flushes the write buffer by invoking #io_process_write until the buffer is empty.
145 146 147 |
# File 'lib/onstomp/connections/base.rb', line 145 def flush_write_buffer io_process_write until @write_buffer.empty? end |
#io_process(&cb) ⇒ Object
Makes a single call to #io_process_write and a single call to #io_process_read
151 152 153 154 155 156 157 |
# File 'lib/onstomp/connections/base.rb', line 151 def io_process &cb io_process_write &cb io_process_read &cb if @connection_up && !connected? triggered_close 'connection timed out', :died end end |
#io_process_read(connecting = false) ⇒ Object
Reads serialized frame data from the socket if we're connected and and the socket is ready for reading. The received data will be pushed to the end of a read buffer, which is then sent to the connection's serializer for processing.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
# File 'lib/onstomp/connections/base.rb', line 236 def io_process_read(connecting=false) if ready_for_read? begin if data = read_nonblock @read_buffer << data update_last_received serializer.bytes_to_frame(@read_buffer) do |frame| yield frame if block_given? client.dispatch_received frame end end rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK # do not rescue EOFError triggered_close $!. raise if connecting rescue Exception # TODO: Fix this potential race condition the right way. # This is the problematic area! If the user (or failover library) # try to reconnect the Client when the connection is closed, the # exception won't be raised until the IO Processing thread has # already been joined to the main thread. Thus, the connection gets # re-established, the "dying" thread re-enters here, and immediately # raises the exception that terminated it. triggered_close $!., :terminated raise end end if connecting && read_timeout_exceeded? triggered_close 'read blocked', :blocked raise OnStomp::ConnectionTimeoutError end end |
#io_process_write ⇒ Object
Writes serialized frame data to the socket if the write buffer is not empty and socket is ready for writing. Once a complete frame has been written, this method will call OnStomp::Client#dispatch_transmitted to notify the client that the frame has been sent to the broker. If a complete frame cannot be written without blocking, the unsent data is sent to the head of the write buffer to be processed first the next time this method is invoked.
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/onstomp/connections/base.rb', line 196 def io_process_write if ready_for_write? to_shift = @write_buffer.length / 3 written = 0 while written < MAX_BYTES_PER_WRITE data, frame = shift_write_buffer break unless data && connected? begin w = write_nonblock data rescue Errno::EINTR, Errno::EAGAIN, Errno::EWOULDBLOCK # writing will either block, or cannot otherwise be completed, # put data back and try again some other day unshift_write_buffer data, frame break rescue Exception triggered_close $!., :terminated raise end written += w update_last_write_activity update_last_transmitted if w < data.length unshift_write_buffer data[w..-1], frame else yield frame if block_given? client.dispatch_transmitted frame end end elsif write_timeout_exceeded? triggered_close 'write blocked', :blocked end if @write_buffer.empty? && @closing triggered_close 'client disconnected' end end |
#push_write_buffer(data, frame) ⇒ Object
Adds data and frame pair to the end of the write buffer
170 171 172 173 174 175 |
# File 'lib/onstomp/connections/base.rb', line 170 def push_write_buffer data, frame @write_mutex.synchronize { update_last_write_activity if @write_buffer.empty? @write_buffer << [data, frame] unless @closing } end |
#shift_write_buffer ⇒ Object
Removes the first data and frame pair from the write buffer
179 180 181 |
# File 'lib/onstomp/connections/base.rb', line 179 def shift_write_buffer @write_mutex.synchronize { @write_buffer.shift } end |
#unshift_write_buffer(data, frame) ⇒ Object
Adds the remains of data and frame pair to the head of the write buffer
185 186 187 |
# File 'lib/onstomp/connections/base.rb', line 185 def unshift_write_buffer data, frame @write_mutex.synchronize { @write_buffer.unshift [data, frame] } end |
#write_frame_nonblock(frame) ⇒ Object
Serializes the given frame and adds the data to the connections internal write buffer
162 163 164 165 |
# File 'lib/onstomp/connections/base.rb', line 162 def write_frame_nonblock frame ser = serializer.frame_to_bytes frame push_write_buffer ser, frame end |