Module: RSMP::Proxy::Modules::Acknowledgements

Included in:
RSMP::Proxy
Defined in:
lib/rsmp/proxy/modules/acknowledgements.rb

Overview

Message acknowledgement handling Manages sending/receiving acks and nacks, and tracking acknowledged messages

Instance Method Summary collapse

Instance Method Details

#acknowledge(original) ⇒ Object

Raises:

  • (InvalidArgument)


7
8
9
10
11
12
13
14
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 7

def acknowledge(original)
  raise InvalidArgument unless original

  ack = MessageAck.build_from(original)
  ack.original = original.clone
  send_message ack, "for #{ack.original.type} #{original.m_id_short}"
  check_ingoing_acknowledged original
end

#acknowledged_first_ingoing(message) ⇒ Object



77
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 77

def acknowledged_first_ingoing(message); end

#acknowledged_first_outgoing(message) ⇒ Object



75
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 75

def acknowledged_first_outgoing(message); end

#check_ack_timeout(now) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 39

def check_ack_timeout(now)
  timeout = @site_settings['timeouts']['acknowledgement']
  # hash cannot be modify during iteration, so clone it
  @awaiting_acknowledgement.clone.each_pair do |_m_id, message|
    latest = message.timestamp + timeout
    next unless now > latest

    str = "No acknowledgements for #{message.type} #{message.m_id_short} within #{timeout} seconds"
    log str, level: :error
    begin
      close
    ensure
      distribute_error MissingAcknowledgment.new(str)
    end
  end
end

#check_ingoing_acknowledged(message) ⇒ Object



68
69
70
71
72
73
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 68

def check_ingoing_acknowledged(message)
  return if @ingoing_acknowledged[message.type]

  @ingoing_acknowledged[message.type] = true
  acknowledged_first_ingoing message
end

#check_outgoing_acknowledged(message) ⇒ Object

TODO: this might be better handled by a proper event machine using e.g. the EventMachine gem



61
62
63
64
65
66
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 61

def check_outgoing_acknowledged(message)
  return if @outgoing_acknowledged[message.type]

  @outgoing_acknowledged[message.type] = true
  acknowledged_first_outgoing message
end

#dont_acknowledge(original, prefix = nil, reason = nil, force: true) ⇒ Object

Raises:

  • (InvalidArgument)


16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 16

def dont_acknowledge(original, prefix = nil, reason = nil, force: true)
  raise InvalidArgument unless original

  str = [prefix, reason].join(' ')
  log str, message: original, level: :warning if reason
  message = MessageNotAck.new({
                                'oMId' => original.m_id,
                                'rea' => reason || 'Unknown reason'
                              })
  message.original = original.clone
  send_message message, "for #{original.type} #{original.m_id_short}", force: force
end

#dont_expect_acknowledgement(message) ⇒ Object



35
36
37
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 35

def dont_expect_acknowledgement(message)
  @awaiting_acknowledgement.delete message.attribute('oMId')
end

#expect_acknowledgement(message) ⇒ Object



29
30
31
32
33
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 29

def expect_acknowledgement(message)
  return if message.is_a?(MessageAck) || message.is_a?(MessageNotAck)

  @awaiting_acknowledgement[message.m_id] = message
end

#find_original_for_message(message) ⇒ Object



56
57
58
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 56

def find_original_for_message(message)
  @awaiting_acknowledgement[message.attribute('oMId')]
end

#log_acknowledgement_for_original(message, original) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 115

def log_acknowledgement_for_original(message, original)
  str = "Received #{message.type} for #{original.type} #{message.attribute('oMId')[0..3]}"
  if message.type == 'MessageNotAck'
    reason = message.attributes['rea']
    str = "#{str}: #{reason}" if reason
    log str, message: message, level: :warning
  else
    log str, message: message, level: :log
  end
end

#log_acknowledgement_for_unknown(message) ⇒ Object



126
127
128
129
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 126

def log_acknowledgement_for_unknown(message)
  log "Received #{message.type} for unknown message #{message.attribute('oMId')[0..3]}", message: message,
                                                                                         level: :warning
end

#process_ack(message) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 79

def process_ack(message)
  original = find_original_for_message message
  if original
    dont_expect_acknowledgement message
    message.original = original
    log_acknowledgement_for_original message, original

    case original.type
    when 'Version'
      version_acknowledged
    when 'StatusSubscribe'
      status_subscribe_acknowledged original
    end

    check_outgoing_acknowledged original

    @acknowledgements[original.m_id] = message
    @acknowledgement_condition.signal message
  else
    log_acknowledgement_for_unknown message
  end
end

#process_not_ack(message) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 102

def process_not_ack(message)
  original = find_original_for_message message
  if original
    dont_expect_acknowledgement message
    message.original = original
    log_acknowledgement_for_original message, original
    @acknowledgements[original.m_id] = message
    @acknowledgement_condition.signal message
  else
    log_acknowledgement_for_unknown message
  end
end

#status_subscribe_acknowledged(original) ⇒ Object



131
132
133
134
135
136
137
138
139
140
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 131

def status_subscribe_acknowledged(original)
  component = find_component original.attribute('cId')
  return unless component

  short = Message.shorten_m_id original.m_id
  subscribe_list = original.attributes['sS']
  log "StatusSubscribe #{short} acknowledged, allowing repeated status values for #{subscribe_list}",
      level: :info
  component.allow_repeat_updates subscribe_list
end