Class: NATSD::Route
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- NATSD::Route
- Includes:
- Connection
- Defined in:
- lib/nats/server/route.rb
Overview
Need to make this a class with EM > 1.0
Instance Attribute Summary collapse
-
#closing ⇒ Object
readonly
Returns the value of attribute closing.
-
#r_obj ⇒ Object
readonly
Returns the value of attribute r_obj.
-
#reconnecting ⇒ Object
(also: #reconnecting?)
readonly
Returns the value of attribute reconnecting.
-
#remote_rid ⇒ Object
readonly
Returns the value of attribute remote_rid.
-
#rid ⇒ Object
readonly
Returns the value of attribute rid.
Attributes included from Connection
#cid, #in_bytes, #in_msgs, #last_activity, #out_bytes, #out_msgs, #subscriptions, #writev_size
Instance Method Summary collapse
- #auth_ok?(user, pass) ⇒ Boolean
- #cancel_reconnect ⇒ Object
- #connect_auth_timeout ⇒ Object
- #connection_completed ⇒ Object
- #ctrace(*args) ⇒ Object
- #dec_connections ⇒ Object
- #inc_connections ⇒ Object
-
#initialize(route = nil) ⇒ Route
constructor
A new instance of Route.
- #is_route? ⇒ Boolean
- #post_init ⇒ Object
- #process_connect_route_config(config) ⇒ Object
- #process_info(info_json) ⇒ Object
- #receive_data(data) ⇒ Object
- #send_auth ⇒ Object
- #send_info ⇒ Object
-
#send_local_subs_to_route ⇒ Object
TODO: Make sure max_requested is also propogated on reconnect.
- #solicited? ⇒ Boolean
- #try_reconnect ⇒ Object
- #type ⇒ Object
- #unbind ⇒ Object
Methods included from Connection
#client_info, #connect_ssl_timeout, #debug_print_controlline_too_big, #debug_print_msg_too_big, #delete_subscriber, #error_close, #flush_data, #info, #max_connections_exceeded?, #process_connect_config, #process_unbind, #queue_data, #send_ping, #ssl_handshake_completed, #ssl_verify_peer, #strip_op
Constructor Details
#initialize(route = nil) ⇒ Route
Returns a new instance of Route.
12 13 14 |
# File 'lib/nats/server/route.rb', line 12 def initialize(route=nil) @r_obj = route end |
Instance Attribute Details
#closing ⇒ Object (readonly)
Returns the value of attribute closing.
8 9 10 |
# File 'lib/nats/server/route.rb', line 8 def closing @closing end |
#r_obj ⇒ Object (readonly)
Returns the value of attribute r_obj.
8 9 10 |
# File 'lib/nats/server/route.rb', line 8 def r_obj @r_obj end |
#reconnecting ⇒ Object (readonly) Also known as: reconnecting?
Returns the value of attribute reconnecting.
8 9 10 |
# File 'lib/nats/server/route.rb', line 8 def reconnecting @reconnecting end |
#remote_rid ⇒ Object (readonly)
Returns the value of attribute remote_rid.
8 9 10 |
# File 'lib/nats/server/route.rb', line 8 def remote_rid @remote_rid end |
#rid ⇒ Object (readonly)
Returns the value of attribute rid.
8 9 10 |
# File 'lib/nats/server/route.rb', line 8 def rid @rid end |
Instance Method Details
#auth_ok?(user, pass) ⇒ Boolean
230 231 232 |
# File 'lib/nats/server/route.rb', line 230 def auth_ok?(user, pass) Server.route_auth_ok?(user, pass) end |
#cancel_reconnect ⇒ Object
249 250 251 252 253 |
# File 'lib/nats/server/route.rb', line 249 def cancel_reconnect EM.cancel_timer(@reconnect_timer) if @reconnect_timer @reconnect_timer = nil @reconnecting = false end |
#connect_auth_timeout ⇒ Object
83 84 85 86 |
# File 'lib/nats/server/route.rb', line 83 def connect_auth_timeout error_close AUTH_REQUIRED debug "#{type} connection timeout due to lack of auth credentials", rid end |
#connection_completed ⇒ Object
20 21 22 23 24 25 26 27 28 |
# File 'lib/nats/server/route.rb', line 20 def connection_completed debug "Route connected", rid return unless reconnecting? # Kill reconnect if we got here from there cancel_reconnect @buf, @closing = nil, false post_init end |
#ctrace(*args) ⇒ Object
265 266 267 |
# File 'lib/nats/server/route.rb', line 265 def ctrace(*args) trace(args, "r: #{rid}") end |
#dec_connections ⇒ Object
239 240 241 242 |
# File 'lib/nats/server/route.rb', line 239 def dec_connections Server.num_routes -= 1 Server.remove_route(self) end |
#inc_connections ⇒ Object
234 235 236 237 |
# File 'lib/nats/server/route.rb', line 234 def inc_connections Server.num_routes += 1 Server.add_route(self) end |
#is_route? ⇒ Boolean
269 270 271 |
# File 'lib/nats/server/route.rb', line 269 def is_route? true end |
#post_init ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/nats/server/route.rb', line 30 def post_init @rid = Server.rid @subscriptions = {} @in_msgs = @out_msgs = @in_bytes = @out_bytes = 0 @writev_size = 0 @parse_state = AWAITING_CONTROL_LINE # Queue up auth if needed and we solicited the connection debug "Route connection created", peer_info, rid # queue up auth if needed and we solicited the connection if solicited? debug "Route sent authorization", rid send_auth else # FIXME, separate variables for timeout? @auth_pending = EM.add_timer(NATSD::Server.auth_timeout) { connect_auth_timeout } if Server.route_auth_required? end send_info @ping_timer = EM.add_periodic_timer(NATSD::Server.ping_interval) { send_ping } @pings_outstanding = 0 inc_connections send_local_subs_to_route end |
#process_connect_route_config(config) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/nats/server/route.rb', line 66 def process_connect_route_config(config) @verbose = config['verbose'] unless config['verbose'].nil? @pedantic = config['pedantic'] unless config['pedantic'].nil? return queue_data(OK) unless Server.route_auth_required? EM.cancel_timer(@auth_pending) if auth_ok?(config['user'], config['pass']) debug "Route received proper credentials", rid queue_data(OK) if @verbose @auth_pending = nil else error_close AUTH_FAILED debug "Authorization failed for #{type.downcase} connection", rid end end |
#process_info(info_json) ⇒ Object
224 225 226 227 228 |
# File 'lib/nats/server/route.rb', line 224 def process_info(info_json) info = JSON.parse(info_json) @remote_rid = info['server_id'] unless info['server_id'].nil? super(info_json) end |
#receive_data(data) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/nats/server/route.rb', line 88 def receive_data(data) @buf = @buf ? @buf << data : data return close_connection if @buf =~ /(\006|\004)/ # ctrl+c or ctrl+d for telnet friendly while (@buf && !@closing) case @parse_state when AWAITING_CONTROL_LINE case @buf when MSG ctrace('MSG OP', strip_op($&)) if NATSD::Server.trace_flag? return connect_auth_timeout if @auth_pending @buf = $' @parse_state = AWAITING_MSG_PAYLOAD @msg_sub, @msg_sid, @msg_reply, @msg_size = $1, $2, $4, $5.to_i if (@msg_size > NATSD::Server.max_payload) debug_print_msg_too_big(@msg_size) error_close PAYLOAD_TOO_BIG end queue_data(INVALID_SUBJECT) if (@pedantic && !(@msg_sub =~ SUB_NO_WC)) when SUB_OP ctrace('SUB OP', strip_op($&)) if NATSD::Server.trace_flag? return connect_auth_timeout if @auth_pending @buf = $' sub, qgroup, sid = $1, $3, $4 return queue_data(INVALID_SUBJECT) if !($1 =~ SUB) return queue_data(INVALID_SID_TAKEN) if @subscriptions[sid] sub = Subscriber.new(self, sub, sid, qgroup, 0) @subscriptions[sid] = sub Server.subscribe(sub, is_route?) queue_data(OK) if @verbose when UNSUB_OP ctrace('UNSUB OP', strip_op($&)) if NATSD::Server.trace_flag? return connect_auth_timeout if @auth_pending @buf = $' sid, sub = $1, @subscriptions[$1] if sub # If we have set max_responses, we will unsubscribe once we have received # the appropriate amount of responses. sub.max_responses = ($2 && $3) ? $3.to_i : nil delete_subscriber(sub) unless (sub.max_responses && (sub.num_responses < sub.max_responses)) queue_data(OK) if @verbose else queue_data(INVALID_SID_NOEXIST) if @pedantic end when PING ctrace('PING OP') if NATSD::Server.trace_flag? @buf = $' queue_data(PONG_RESPONSE) flush_data when PONG ctrace('PONG OP') if NATSD::Server.trace_flag? @buf = $' @pings_outstanding -= 1 when CONNECT ctrace('CONNECT OP', strip_op($&)) if NATSD::Server.trace_flag? @buf = $' begin config = JSON.parse($1) process_connect_route_config(config) rescue => e queue_data(INVALID_CONFIG) log_error end when INFO_REQ ctrace('INFO_REQUEST OP') if NATSD::Server.trace_flag? return connect_auth_timeout if @auth_pending @buf = $' send_info when INFO ctrace('INFO OP', strip_op($&)) if NATSD::Server.trace_flag? return connect_auth_timeout if @auth_pending @buf = $' process_info($1) when ERR_RESP ctrace('-ERR', $1) if NATSD::Server.trace_flag? close_connection exit when OK_RESP ctrace('+OK') if NATSD::Server.trace_flag? @buf = $' when UNKNOWN ctrace('Unknown Op', strip_op($&)) if NATSD::Server.trace_flag? return connect_auth_timeout if @auth_pending @buf = $' queue_data(UNKNOWN_OP) else # If we are here we do not have a complete line yet that we understand. # If too big, cut the connection off. if @buf.bytesize > NATSD::Server.max_control_line debug_print_controlline_too_big(@buf.bytesize) close_connection end return end @buf = nil if (@buf && @buf.empty?) when AWAITING_MSG_PAYLOAD return unless (@buf.bytesize >= (@msg_size + CR_LF_SIZE)) msg = @buf.slice(0, @msg_size) ctrace('Processing routed msg', @msg_sub, @msg_reply, msg) if NATSD::Server.trace_flag? queue_data(OK) if @verbose # We deliver normal subscriptions like a client publish, which # eliminates the duplicate traversal over the route. However, # qgroups are sent individually per group for only the route # with the intended subscriber, since route interest is L2 # semantics, we deliver those direct. if (sub = Server.rsid_qsub(@msg_sid)) # Allows nil reply to not have extra space reply = @msg_reply + ' ' if @msg_reply Server.deliver_to_subscriber(sub, @msg_sub, reply, msg) else Server.route_to_subscribers(@msg_sub, @msg_reply, msg, is_route?) end @in_msgs += 1 @in_bytes += @msg_size @buf = @buf.slice((@msg_size + CR_LF_SIZE), @buf.bytesize) @msg_sub = @msg_size = @reply = nil @parse_state = AWAITING_CONTROL_LINE @buf = nil if (@buf && @buf.empty?) end end end |
#send_auth ⇒ Object
214 215 216 217 218 |
# File 'lib/nats/server/route.rb', line 214 def send_auth return unless r_obj[:uri].user cs = { :user => r_obj[:uri].user, :pass => r_obj[:uri].password } queue_data("CONNECT #{cs.to_json}#{CR_LF}") end |
#send_info ⇒ Object
220 221 222 |
# File 'lib/nats/server/route.rb', line 220 def send_info queue_data("INFO #{Server.route_info_string}#{CR_LF}") end |
#send_local_subs_to_route ⇒ Object
TODO: Make sure max_requested is also propogated on reconnect
57 58 59 60 61 62 63 64 |
# File 'lib/nats/server/route.rb', line 57 def send_local_subs_to_route ObjectSpace.each_object(NATSD::Connection) do |c| next if c.closing? || c.type != 'Client' c.subscriptions.each_value do |sub| queue_data(NATSD::Server.route_sub_proto(sub)) end end end |
#solicited? ⇒ Boolean
16 17 18 |
# File 'lib/nats/server/route.rb', line 16 def solicited? r_obj != nil end |
#try_reconnect ⇒ Object
244 245 246 247 |
# File 'lib/nats/server/route.rb', line 244 def try_reconnect debug "Trying to reconnect route", peer_info, rid EM.reconnect(r_obj[:uri].host, r_obj[:uri].port, self) end |
#type ⇒ Object
273 274 275 |
# File 'lib/nats/server/route.rb', line 273 def type 'Route' end |
#unbind ⇒ Object
255 256 257 258 259 260 261 262 263 |
# File 'lib/nats/server/route.rb', line 255 def unbind return if reconnecting? debug "Route connection closed", peer_info, rid process_unbind if solicited? @reconnecting = true @reconnect_timer = EM.add_periodic_timer(NATSD::DEFAULT_ROUTE_RECONNECT_INTERVAL) { try_reconnect } end end |