Module: RSMP::Proxy::Modules::Receive

Included in:
RSMP::Proxy
Defined in:
lib/rsmp/proxy/modules/receive.rb

Overview

Message processing functionality Handles receiving and processing incoming messages

Instance Method Summary collapse

Instance Method Details

#expect_version_message(message) ⇒ Object

Raises:



111
112
113
114
115
# File 'lib/rsmp/proxy/modules/receive.rb', line 111

def expect_version_message(message)
  return if message.is_a?(Version) || message.is_a?(MessageAck) || message.is_a?(MessageNotAck)

  raise HandshakeError, 'Version must be received first'
end

#handle_fatal_error(message, error) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/rsmp/proxy/modules/receive.rb', line 56

def handle_fatal_error(message, error)
  reason = error.message
  str = "Rejected #{message.type},"
  distribute_error error.exception(str), message: message
  dont_acknowledge message, str, reason
  close
  message
end

#handle_invalid_message(message, error) ⇒ Object



48
49
50
51
52
53
54
# File 'lib/rsmp/proxy/modules/receive.rb', line 48

def handle_invalid_message(message, error)
  reason = error.message.to_s
  str = "Received invalid #{message.type},"
  distribute_error error.exception("#{str} #{message.json}"), message: message
  dont_acknowledge message, str, reason
  message
end

#handle_invalid_packet(json, error) ⇒ Object



25
26
27
28
29
30
# File 'lib/rsmp/proxy/modules/receive.rb', line 25

def handle_invalid_packet(json, error)
  str = "Received invalid package, must be valid JSON but got #{json.size} bytes: #{error.message}"
  distribute_error error.exception(str)
  log str, level: :warning
  nil
end

#handle_malformed_message(attributes, error) ⇒ Object



32
33
34
35
36
37
# File 'lib/rsmp/proxy/modules/receive.rb', line 32

def handle_malformed_message(attributes, error)
  str = "Received malformed message, #{error.message}"
  distribute_error error.exception(str)
  log str, message: Malformed.new(attributes), level: :warning
  nil
end

#handle_schema_error(message, error) ⇒ Object



39
40
41
42
43
44
45
46
# File 'lib/rsmp/proxy/modules/receive.rb', line 39

def handle_schema_error(message, error)
  schemas_string = error.schemas.map { |schema| "#{schema.first}: #{schema.last}" }.join(', ')
  reason = "schema errors (#{schemas_string}): #{error.message}"
  str = "Received invalid #{message.type}"
  distribute_error error.exception(str), message: message
  dont_acknowledge message, str, reason
  message
end

#process_deferredObject



17
18
19
# File 'lib/rsmp/proxy/modules/receive.rb', line 17

def process_deferred
  @node.process_deferred
end

#process_message(message) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/rsmp/proxy/modules/receive.rb', line 90

def process_message(message)
  case message
  when MessageAck
    process_ack message
  when MessageNotAck
    process_not_ack message
  when Version
    process_version message
  when RSMP::Watchdog
    process_watchdog message
  else
    dont_acknowledge message, 'Received', "unknown message (#{message.type})"
  end
end

#process_packet(json) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rsmp/proxy/modules/receive.rb', line 65

def process_packet(json)
  attributes = Message.parse_attributes json
  message = Message.build attributes, json
  message.validate(schemas) if should_validate_ingoing_message?(message)
  verify_sequence message
  with_deferred_distribution do
    distribute message
    process_message message
  end
  process_deferred
  message
rescue InvalidPacket => e
  handle_invalid_packet(json, e)
rescue MalformedMessage => e
  handle_malformed_message(attributes, e)
rescue SchemaError, RSMP::Schema::Error => e
  handle_schema_error(message, e)
rescue InvalidMessage => e
  handle_invalid_message(message, e)
rescue FatalError => e
  handle_fatal_error(message, e)
ensure
  @node.clear_deferred
end

#should_validate_ingoing_message?(message) ⇒ Boolean

Returns:

  • (Boolean)


7
8
9
10
11
12
13
14
15
# File 'lib/rsmp/proxy/modules/receive.rb', line 7

def should_validate_ingoing_message?(message)
  return true unless @site_settings

  skip = @site_settings['skip_validation']
  return true unless skip

  klass = message.class.name.split('::').last
  !skip.include?(klass)
end

#verify_sequence(message) ⇒ Object



21
22
23
# File 'lib/rsmp/proxy/modules/receive.rb', line 21

def verify_sequence(message)
  expect_version_message(message) unless @version_determined
end

#will_not_handle(message) ⇒ Object



105
106
107
108
109
# File 'lib/rsmp/proxy/modules/receive.rb', line 105

def will_not_handle(message)
  reason ||= "since we're a #{self.class.name.downcase}"
  log "Ignoring #{message.type}, #{reason}", message: message, level: :warning
  dont_acknowledge message, nil, reason
end