Module: Fix::Engine::Connection
- Includes:
- Logger
- Included in:
- ClientConnection, ServerConnection
- Defined in:
- lib/fix/engine/connection.rb
Overview
The client connection handling logic and method overrides
Constant Summary collapse
- TEST_REQ_GRACE_TIME =
Grace time before we disconnect a client that doesn’t reply to a test request
15
Instance Attribute Summary collapse
-
#comp_id ⇒ Object
Returns the value of attribute comp_id.
-
#hrtbt_int ⇒ Object
Returns the value of attribute hrtbt_int.
-
#ip ⇒ Object
Returns the value of attribute ip.
-
#last_request_at ⇒ Object
Returns the value of attribute last_request_at.
-
#msg_buf ⇒ Object
Returns the value of attribute msg_buf.
-
#port ⇒ Object
Returns the value of attribute port.
-
#target_comp_id ⇒ Object
Returns the value of attribute target_comp_id.
Instance Method Summary collapse
-
#keep_alive ⇒ Object
Keeps the connection alive by sending regular heartbeats, and test request messages whenever the connection has been idl’ing for too long.
-
#kill! ⇒ Object
Kills the connection after sending a logout message, if applicable.
-
#peer ⇒ Object
The way we refer to our connection peer in various logs and messages.
-
#peer_error(error_msg, msg_seq_num) ⇒ Object
Notifies the connected peer it fucked up somehow and kill the connection.
-
#post_init ⇒ Object
Initialize the messages array, our comp_id, and the expected message sequence number.
-
#process_msg(msg) ⇒ Object
Maintains the message sequence consistency before handing off the message to
#handle_msg. -
#receive_data(data) ⇒ Object
Run when a client has sent a chunk of data, it gets appended to a buffer and a parsing attempt is made at the buffered data.
-
#run_message_handler(msg) ⇒ Object
Runs the defined message handler for the message’s class.
-
#send_heartbeat(test_req_id = nil) ⇒ Object
Sends a heartbeat message with an optional
test_req_idparameter. -
#send_msg(msg) ⇒ Object
Sends a
Fix::Protocol::Messageto the connected peer. -
#send_test_request ⇒ Object
Sends a test request and expects an answer before
TEST_REQ_GRACE_TIME. -
#set_heartbeat_interval(interval) ⇒ Object
Sets the heartbeat interval and schedules the keep alive call.
-
#unbind ⇒ Object
Cleans up after we’re done.
Methods included from Logger
Instance Attribute Details
#comp_id ⇒ Object
Returns the value of attribute comp_id.
20 21 22 |
# File 'lib/fix/engine/connection.rb', line 20 def comp_id @comp_id end |
#hrtbt_int ⇒ Object
Returns the value of attribute hrtbt_int.
20 21 22 |
# File 'lib/fix/engine/connection.rb', line 20 def hrtbt_int @hrtbt_int end |
#ip ⇒ Object
Returns the value of attribute ip.
20 21 22 |
# File 'lib/fix/engine/connection.rb', line 20 def ip @ip end |
#last_request_at ⇒ Object
Returns the value of attribute last_request_at.
20 21 22 |
# File 'lib/fix/engine/connection.rb', line 20 def last_request_at @last_request_at end |
#msg_buf ⇒ Object
Returns the value of attribute msg_buf.
20 21 22 |
# File 'lib/fix/engine/connection.rb', line 20 def msg_buf @msg_buf end |
#port ⇒ Object
Returns the value of attribute port.
20 21 22 |
# File 'lib/fix/engine/connection.rb', line 20 def port @port end |
#target_comp_id ⇒ Object
Returns the value of attribute target_comp_id.
20 21 22 |
# File 'lib/fix/engine/connection.rb', line 20 def target_comp_id @target_comp_id end |
Instance Method Details
#keep_alive ⇒ Object
Keeps the connection alive by sending regular heartbeats, and test request messages whenever the connection has been idl’ing for too long
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/fix/engine/connection.rb', line 56 def keep_alive @last_send_at ||= 0 @last_request_at ||= 0 @hrtbt_int ||= 0 # Send a regular heartbeat when we don't send anything down the line for a while if @hrtbt_int > 0 && (@last_send_at < (Time.now.to_i - @hrtbt_int)) send_heartbeat end # Trigger a test req message when we haven't received anything for a while if !@pending_test_req_id && (last_request_at < (Time.now.to_i - @hrtbt_int)) send_test_request end end |
#kill! ⇒ Object
Kills the connection after sending a logout message, if applicable
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/fix/engine/connection.rb', line 125 def kill! if @target_comp_id log("Logging out #{peer}") logout = FP::Messages::Logout.new logout.text = 'Bye!' send_msg(logout) end close_connection_after_writing end |
#peer ⇒ Object
The way we refer to our connection peer in various logs and messages
35 36 37 |
# File 'lib/fix/engine/connection.rb', line 35 def peer "server" end |
#peer_error(error_msg, msg_seq_num) ⇒ Object
Notifies the connected peer it fucked up somehow and kill the connection
152 153 154 155 156 157 158 159 160 161 |
# File 'lib/fix/engine/connection.rb', line 152 def peer_error(error_msg, msg_seq_num) log("Notifying #{peer} of error: <#{error_msg}> and terminating") rjct = FP::Messages::Reject.new rjct.text = error_msg rjct.ref_seq_num = msg_seq_num send_msg(rjct) kill! end |
#post_init ⇒ Object
Initialize the messages array, our comp_id, and the expected message sequence number
25 26 27 28 29 30 |
# File 'lib/fix/engine/connection.rb', line 25 def post_init @expected_seq_num = 1 # The sent messages = [] end |
#process_msg(msg) ⇒ Object
Maintains the message sequence consistency before handing off the message to #handle_msg
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/fix/engine/connection.rb', line 166 def process_msg(msg) @recv_seq_num = msg.msg_seq_num log("Received a <#{msg.class}> from #{peer} with sequence number <#{msg.msg_seq_num}>") # If sequence number == expected, then process it normally if (@expected_seq_num == @recv_seq_num) if @comp_id && msg.target_comp_id != @comp_id @target_comp_id = msg.sender_comp_id # Whoops, incorrect COMP_ID received, kill it with fire if (msg.target_comp_id != @comp_id) peer_error("Incorrect TARGET_COMP_ID in message, expected <#{@comp_id}>, got <#{msg.target_comp_id}>", msg.header.msg_seq_num) end else if msg.is_a?(FP::Messages::Heartbeat) # If we were expecting an answer to a test request we can sign it off and # cancel the scheduled connection termination if @pending_test_req_id && msg.test_req_id && (@pending_test_req_id == msg.test_req_id) @pending_test_req_id = nil end elsif msg.is_a?(FP::Messages::TestRequest) # Answer test requests with a matching heartbeat hb = FP::Messages::Heartbeat.new hb.test_req_id = msg.test_req_id send_msg(hb) elsif msg.is_a?(FP::Messages::ResendRequest) # Re-send requested message range [msg.begin_seq_no - 1, (msg.end_seq_no.zero? ? .length : (msg.end_seq_no - msg.begin_seq_no + 1))].each do |m| log("Re-sending <#{m.class}> to <#{ip}:#{port}> with sequence number <#{m.msg_seq_num}>") send_data(m.dump) @last_send_at = Time.now.to_i end elsif msg.is_a?(FP::Message) (msg) end end @expected_seq_num += 1 elsif (@expected_seq_num > @recv_seq_num) log("Ignoring message <#{msg}> with stale sequence number <#{msg.msg_seq_num}>, expecting <#{@expected_seq_num}>") elsif (@expected_seq_num < @recv_seq_num) && @target_comp_id # Request missing range when detect a gap rr = FP::Messages::ResendRequest.new rr.begin_seq_no = @expected_seq_num send_msg(rr) end self.last_request_at = Time.now.to_i end |
#receive_data(data) ⇒ Object
Run when a client has sent a chunk of data, it gets appended to a buffer and a parsing attempt is made at the buffered data
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/fix/engine/connection.rb', line 239 def receive_data(data) @buf ||= MessageBuffer.new do |parsed| if (parsed.class == FP::ParseFailure) || !parsed.errors.count.zero? peer_error("#{parsed.message} -- #{parsed.errors.join(", ")}", @expected_seq_num) log("Failed to parse message <#{parsed.message}>") parsed.errors.each { |err| log(" >>> #{err}") } else process_msg(parsed) end end begin @buf.add_data(data) rescue log("Raised exception by #{peer} when parsing data <#{@buf.msg_buf.gsub(/\x01/, '|')}>, terminating.") log($!. + $!.backtrace.join("\n")) kill! end end |
#run_message_handler(msg) ⇒ Object
Runs the defined message handler for the message’s class
228 229 230 231 |
# File 'lib/fix/engine/connection.rb', line 228 def (msg) m = "on_#{msg.class.to_s.split('::').last.gsub(/(.)([A-Z])/, '\1_\2').downcase}".to_sym send(m, msg) if respond_to?(m) end |
#send_heartbeat(test_req_id = nil) ⇒ Object
Sends a heartbeat message with an optional test_req_id parameter
91 92 93 94 95 |
# File 'lib/fix/engine/connection.rb', line 91 def send_heartbeat(test_req_id = nil) msg = FP::Messages::Heartbeat.new test_req_id && msg.test_req_id = test_req_id send_msg(msg) end |
#send_msg(msg) ⇒ Object
Sends a Fix::Protocol::Message to the connected peer
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/fix/engine/connection.rb', line 102 def send_msg(msg) @send_seq_num ||= 1 msg.msg_seq_num = @send_seq_num msg.sender_comp_id = @comp_id msg.target_comp_id = @target_comp_id log("Sending <#{msg.class}> to #{peer} with sequence number <#{msg.msg_seq_num}>") if msg.valid? [msg.msg_seq_num] = msg send_data(msg.dump) @send_seq_num += 1 @last_send_at = Time.now.to_i else log(msg.errors.join(', ')) raise "Tried to send invalid message! <#{msg.errors.join(', ')}>" end end |
#send_test_request ⇒ Object
Sends a test request and expects an answer before TEST_REQ_GRACE_TIME
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/fix/engine/connection.rb', line 75 def send_test_request tr = FP::Messages::TestRequest.new tr.test_req_id = SecureRandom.hex(6) send_msg(tr) @pending_test_req_id = tr.test_req_id EM.add_timer(TEST_REQ_GRACE_TIME) do @pending_test_req_id && kill! end end |
#set_heartbeat_interval(interval) ⇒ Object
Sets the heartbeat interval and schedules the keep alive call
44 45 46 47 48 49 50 |
# File 'lib/fix/engine/connection.rb', line 44 def set_heartbeat_interval(interval) @hrtbt_int && raise("Can't set heartbeat interval twice") @hrtbt_int = interval log("Heartbeat interval for #{peer} : <#{hrtbt_int}s>") @keep_alive_timer = EM.add_periodic_timer(1) { keep_alive } end |
#unbind ⇒ Object
Cleans up after we’re done
141 142 143 144 |
# File 'lib/fix/engine/connection.rb', line 141 def unbind log("Terminating connection to #{peer}") @keep_alive_timer && @keep_alive_timer.cancel end |