Class: Diameter::Stack
- Inherits:
-
Object
- Object
- Diameter::Stack
- Includes:
- Internals
- Defined in:
- lib/diameter/stack.rb
Setup methods collapse
-
#add_handler(app_id, opts = {}) {|req, cxn| ... } ⇒ Object
Adds a handler for a specific Diameter application.
-
#initialize(host, realm, opts = {}) ⇒ Stack
constructor
Stack constructor.
-
#listen_for_tcp(port = 3868) ⇒ Object
Begins listening for inbound Diameter connections (making this a Diameter server instead of just a client).
-
#start ⇒ Object
Complete the stack initialization and begin reading from the TCP connections.
Peer connections and message sending collapse
-
#connect_to_peer(peer_uri, peer_host, realm) ⇒ Peer
Creates a Peer connection to a Diameter agent at the specific network location indicated by peer_uri.
-
#connect_to_realm(realm) ⇒ Peer
Looks up the given Diameter realm with DNS-SRV, and establishes a connection to one peer in that realm.
-
#peer_state(id) ⇒ Keyword
Retrieves the current state of a peer, defaulting to :CLOSED if the peer does not exist.
-
#send_answer(ans, original_cxn) ⇒ Object
Sends a Diameter answer.
-
#send_request(req, options = {}) ⇒ Object
Sends a Diameter request.
Instance Method Summary collapse
-
#close(connection) ⇒ Object
Closes the given connection, blanking out any internal data structures associated with it.
-
#handle_message(msg_bytes, cxn) ⇒ Object
Handles a Diameter request straight from a network connection.
-
#shutdown ⇒ Object
This shuts the stack down, closing all TCP connections and terminating any background threads still waiting for an answer.
Constructor Details
#initialize(host, realm, opts = {}) ⇒ Stack
The stack does not advertise any applications to peers by default - #add_handler must be called early on.
Stack constructor.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/diameter/stack.rb', line 28 def initialize(host, realm, opts={}) @local_host = host @local_realm = realm @auth_apps = [] @acct_apps = [] @pending_ete = {} @tcp_helper = TCPStackHelper.new(self) @peer_table = {} @handlers = {} @answer_timeout = opts.fetch(:timeout, 60) @threadpool = Concurrent::ThreadPoolExecutor.new( min_threads: 5, max_threads: 5, max_queue: 1, fallback_policy: :caller_runs ) @res = Dnsruby::Resolver.new Diameter.logger.log(Logger::INFO, 'Stack initialized') end |
Instance Method Details
#add_handler(app_id, opts = {}) {|req, cxn| ... } ⇒ Object
If you expect to only send requests for this application, not receive them, the block can be a no-op (e.g. ‘{ nil }`)
Adds a handler for a specific Diameter application.
91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/diameter/stack.rb', line 91 def add_handler(app_id, opts={}, &blk) vendor = opts.fetch(:vendor, 0) auth = opts.fetch(:auth, false) acct = opts.fetch(:acct, false) raise ArgumentError.new("Must specify at least one of auth or acct") unless auth or acct @acct_apps << [app_id, vendor] if acct @auth_apps << [app_id, vendor] if auth @handlers[app_id] = blk end |
#close(connection) ⇒ Object
Closes the given connection, blanking out any internal data structures associated with it.
Likely to be moved to the Peer object in a future release/
124 125 126 |
# File 'lib/diameter/stack.rb', line 124 def close(connection) @tcp_helper.close(connection) end |
#connect_to_peer(peer_uri, peer_host, realm) ⇒ Peer
Creates a Peer connection to a Diameter agent at the specific network location indicated by peer_uri.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/diameter/stack.rb', line 161 def connect_to_peer(peer_uri, peer_host, realm) @peer_table[peer_host] = Peer.new(peer_host, realm) @peer_table[peer_host].state = :WAITING # Will move to :UP when the CEA is received uri = URI(peer_uri) cxn = @tcp_helper.setup_new_connection(uri.host, uri.port) @peer_table[peer_host].cxn = cxn avps = [AVP.create('Origin-Host', @local_host), AVP.create('Origin-Realm', @local_realm), AVP.create('Host-IP-Address', IPAddr.new('127.0.0.1')), AVP.create('Vendor-Id', 100), AVP.create('Product-Name', 'ruby-diameter') ] avps += app_avps cer_bytes = Message.new(version: 1, command_code: 257, app_id: 0, request: true, proxyable: false, retransmitted: false, error: false, avps: avps).to_wire @tcp_helper.send(cer_bytes, cxn) @peer_table[peer_host] end |
#connect_to_realm(realm) ⇒ Peer
Looks up the given Diameter realm with DNS-SRV, and establishes a connection to one peer in that realm.
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/diameter/stack.rb', line 135 def connect_to_realm(realm) possible_peers = [] @res.query("_diameter._tcp.#{realm}", "SRV").each_answer do |a| possible_peers << {name: a.target.to_s, port: a.port, priority: a.priority, weight: a.weight} end # Prefer the lowest priority and the highest weight possible_peers.sort!{ |a, b| (a[:priority] <=> b[:priority]) || (b[:weight] <=> a[:weight])} Diameter.logger.debug("Sorted list of peers for realm #{realm} is #{possible_peers.inspect}") primary = possible_peers[0] url = "aaa://#{primary[:name]}:#{primary[:port]}" Diameter.logger.info("Primary peer for realm #{realm} is #{primary[:name]}, (#{url})") connect_to_peer(url, primary[:name], realm) end |
#handle_message(msg_bytes, cxn) ⇒ Object
Handles a Diameter request straight from a network connection. Intended to be called by TCPStackHelper after it retrieves a message, not directly by users.
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/diameter/stack.rb', line 282 def (msg_bytes, cxn) # Common processing - ensure that this message has come in on this # peer's expected connection, and update the last time we saw # activity on this peer msg = Message.from_bytes(msg_bytes) Diameter.logger.debug("Handling message #{msg}") peer = msg.avp_by_name('Origin-Host').octet_string if @peer_table[peer] @peer_table[peer].reset_timer unless @peer_table[peer].cxn == cxn Diameter.logger.log(Logger::WARN, "Ignoring message - claims to be from #{peer} but comes from #{cxn} not #{@peer_table[peer].cxn}") end end if msg.command_code == 257 && msg.answer handle_cea(msg, cxn) elsif msg.command_code == 257 && msg.request handle_cer(msg, cxn) elsif msg.command_code == 280 && msg.request handle_dwr(msg, cxn) elsif msg.command_code == 280 && msg.answer # No-op - we've already updated our timestamp elsif msg.answer handle_other_answer(msg, cxn) elsif @handlers.has_key? msg.app_id @handlers[msg.app_id].call(msg, cxn) else Diameter.logger.warn("Ignoring message from unrecognised application #{msg.app_id} (Command-Code #{msg.command_code})") end end |
#listen_for_tcp(port = 3868) ⇒ Object
Begins listening for inbound Diameter connections (making this a Diameter server instead of just a client).
63 64 65 |
# File 'lib/diameter/stack.rb', line 63 def listen_for_tcp(port=3868) @tcp_helper.setup_new_listen_connection("0.0.0.0", port) end |
#peer_state(id) ⇒ Keyword
Retrieves the current state of a peer, defaulting to :CLOSED if the peer does not exist.
268 269 270 271 272 273 274 |
# File 'lib/diameter/stack.rb', line 268 def peer_state(id) if !@peer_table.key? id :CLOSED else @peer_table[id].state end end |
#send_answer(ans, original_cxn) ⇒ Object
Sends a Diameter answer. This is sent over the same connection the request was received on (which needs to be passed into to this method).
This adds this stack’s Origin-Host and Origin-Realm AVPs, if those AVPs don’t already exist.
257 258 259 260 261 |
# File 'lib/diameter/stack.rb', line 257 def send_answer(ans, original_cxn) fail "Must pass an answer" unless ans.answer ans.add_origin_host_and_realm(@local_host, @local_realm) @tcp_helper.send(ans.to_wire, original_cxn) end |
#send_request(req, options = {}) ⇒ Object
Sends a Diameter request. This is routed to an appropriate peer based on the Destination-Host AVP (or, if that is absent, on the Destination-Realm AVP).
This adds this stack’s Origin-Host and Origin-Realm AVPs, if those AVPs don’t already exist.
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/diameter/stack.rb', line 192 def send_request(req, ={}) fail "Must pass a request" unless req.request req.add_origin_host_and_realm(@local_host, @local_realm) peer = [:peer] if peer.nil? peer = if req['Destination-Host'] peer_identity = req['Destination-Host'].octet_string Diameter.logger.debug("Selecting peer by Destination-Host (#{peer_identity})") @peer_table[peer_identity] elsif req['Destination-Realm'] realm = req['Destination-Realm'].octet_string Diameter.logger.debug("Selecting peer by Destination-Realm (#{realm})") @peer_table.values.select { |p| p.realm == realm }.sample else fail "Request must have Destination-Host or Destination-Realm" end else Diameter.logger.debug("Peer selection forced to #{peer.identity}") end if peer.nil? Diameter.logger.warn("No peer is available to send message - cannot route") fail "No acceptable peer" elsif peer.state == :UP q = Queue.new @pending_ete[req.ete] = q @tcp_helper.send(req.to_wire, peer.cxn) =begin # Time this request out if no answer is received Diameter.logger.debug("Scheduling timeout for #{@answer_timeout}s time") Concurrent::timer(@answer_timeout) do Diameter.logger.debug("Timing out message with EtE #{req.ete}") q = @pending_ete.delete(req.ete) if q q.push(:timeout) end end =end p = Concurrent::Promise.execute(executor: @threadpool) { Diameter.logger.debug("Waiting for answer to message with EtE #{req.ete}, queue #{q}") val = q.pop Diameter.logger.debug("Promise fulfilled for message with EtE #{req.ete}") val } return p else Diameter.logger.warn("Peer #{peer.identity} is in state #{peer.state} - cannot route") end end |
#shutdown ⇒ Object
This shuts the stack down, closing all TCP connections and terminating any background threads still waiting for an answer.
108 109 110 111 112 113 114 115 116 |
# File 'lib/diameter/stack.rb', line 108 def shutdown @tcp_helper.shutdown @pending_ete.each do |ete, q| Diameter.logger.debug("Shutting down queue #{q} as no answer has been received with EtE #{ete}") q.push :shutdown end @threadpool.kill @threadpool.wait_for_termination(5) end |
#start ⇒ Object
Complete the stack initialization and begin reading from the TCP connections.
55 56 57 |
# File 'lib/diameter/stack.rb', line 55 def start @tcp_helper.start_main_loop end |