Module: RSMP::Proxy::Modules::Receive
- Included in:
- RSMP::Proxy
- Defined in:
- lib/rsmp/proxy/modules/receive.rb
Overview
Message processing functionality Handles receiving and processing incoming messages
Instance Method Summary collapse
- #expect_version_message(message) ⇒ Object
- #handle_fatal_error(message, error) ⇒ Object
- #handle_invalid_message(message, error) ⇒ Object
- #handle_invalid_packet(json, error) ⇒ Object
- #handle_malformed_message(attributes, error) ⇒ Object
- #handle_schema_error(message, error) ⇒ Object
- #process_deferred ⇒ Object
- #process_message(message) ⇒ Object
- #process_packet(json) ⇒ Object
- #should_validate_ingoing_message?(message) ⇒ Boolean
- #verify_sequence(message) ⇒ Object
- #will_not_handle(message) ⇒ Object
Instance Method Details
#expect_version_message(message) ⇒ Object
111 112 113 114 115 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 111 def () return if .is_a?(Version) || .is_a?(MessageAck) || .is_a?(MessageNotAck) raise HandshakeError, 'Version must be received first' end |
#handle_fatal_error(message, error) ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 56 def handle_fatal_error(, error) reason = error. str = "Rejected #{.type}," distribute_error error.exception(str), message: dont_acknowledge , str, reason close end |
#handle_invalid_message(message, error) ⇒ Object
48 49 50 51 52 53 54 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 48 def (, error) reason = error..to_s str = "Received invalid #{.type}," distribute_error error.exception("#{str} #{.json}"), message: dont_acknowledge , str, reason end |
#handle_invalid_packet(json, error) ⇒ Object
25 26 27 28 29 30 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 25 def handle_invalid_packet(json, error) str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{error.}" distribute_error error.exception(str) log str, level: :warning nil end |
#handle_malformed_message(attributes, error) ⇒ Object
32 33 34 35 36 37 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 32 def (attributes, error) str = "Received malformed message, #{error.}" distribute_error error.exception(str) log str, message: Malformed.new(attributes), level: :warning nil end |
#handle_schema_error(message, error) ⇒ Object
39 40 41 42 43 44 45 46 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 39 def handle_schema_error(, error) schemas_string = error.schemas.map { |schema| "#{schema.first}: #{schema.last}" }.join(', ') reason = "schema errors (#{schemas_string}): #{error.}" str = "Received invalid #{.type}" distribute_error error.exception(str), message: dont_acknowledge , str, reason end |
#process_deferred ⇒ Object
17 18 19 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 17 def process_deferred @node.process_deferred end |
#process_message(message) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 90 def () case when MessageAck process_ack when MessageNotAck process_not_ack when Version process_version when RSMP::Watchdog process_watchdog else dont_acknowledge , 'Received', "unknown message (#{.type})" end end |
#process_packet(json) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 65 def process_packet(json) attributes = Message.parse_attributes json = Message.build attributes, json .validate(schemas) if () verify_sequence with_deferred_distribution do distribute end process_deferred rescue InvalidPacket => e handle_invalid_packet(json, e) rescue MalformedMessage => e (attributes, e) rescue SchemaError, RSMP::Schema::Error => e handle_schema_error(, e) rescue InvalidMessage => e (, e) rescue FatalError => e handle_fatal_error(, e) ensure @node.clear_deferred end |
#should_validate_ingoing_message?(message) ⇒ Boolean
7 8 9 10 11 12 13 14 15 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 7 def () return true unless @site_settings skip = @site_settings['skip_validation'] return true unless skip klass = .class.name.split('::').last !skip.include?(klass) end |
#verify_sequence(message) ⇒ Object
21 22 23 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 21 def verify_sequence() () unless @version_determined end |
#will_not_handle(message) ⇒ Object
105 106 107 108 109 |
# File 'lib/rsmp/proxy/modules/receive.rb', line 105 def will_not_handle() reason ||= "since we're a #{self.class.name.downcase}" log "Ignoring #{.type}, #{reason}", message: , level: :warning dont_acknowledge , nil, reason end |