Module: RSMP::Proxy::Modules::Acknowledgements
- Included in:
- RSMP::Proxy
- Defined in:
- lib/rsmp/proxy/modules/acknowledgements.rb
Overview
Message acknowledgement handling Manages sending/receiving acks and nacks, and tracking acknowledged messages
Instance Method Summary collapse
- #acknowledge(original) ⇒ Object
- #acknowledged_first_ingoing(message) ⇒ Object
- #acknowledged_first_outgoing(message) ⇒ Object
- #check_ack_timeout(now) ⇒ Object
- #check_ingoing_acknowledged(message) ⇒ Object
-
#check_outgoing_acknowledged(message) ⇒ Object
TODO: this might be better handled by a proper event machine using e.g.
- #dont_acknowledge(original, prefix = nil, reason = nil, force: true) ⇒ Object
- #dont_expect_acknowledgement(message) ⇒ Object
- #expect_acknowledgement(message) ⇒ Object
- #find_original_for_message(message) ⇒ Object
- #log_acknowledgement_for_original(message, original) ⇒ Object
- #log_acknowledgement_for_unknown(message) ⇒ Object
- #process_ack(message) ⇒ Object
- #process_not_ack(message) ⇒ Object
- #status_subscribe_acknowledged(original) ⇒ Object
Instance Method Details
#acknowledge(original) ⇒ Object
7 8 9 10 11 12 13 14 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 7 def acknowledge(original) raise InvalidArgument unless original ack = MessageAck.build_from(original) ack.original = original.clone ack, "for #{ack.original.type} #{original.m_id_short}" check_ingoing_acknowledged original end |
#acknowledged_first_ingoing(message) ⇒ Object
77 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 77 def acknowledged_first_ingoing(); end |
#acknowledged_first_outgoing(message) ⇒ Object
75 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 75 def acknowledged_first_outgoing(); end |
#check_ack_timeout(now) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 39 def check_ack_timeout(now) timeout = @site_settings['timeouts']['acknowledgement'] # hash cannot be modify during iteration, so clone it @awaiting_acknowledgement.clone.each_pair do |_m_id, | latest = . + timeout next unless now > latest str = "No acknowledgements for #{message.type} #{message.m_id_short} within #{timeout} seconds" log str, level: :error begin close ensure distribute_error MissingAcknowledgment.new(str) end end end |
#check_ingoing_acknowledged(message) ⇒ Object
68 69 70 71 72 73 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 68 def check_ingoing_acknowledged() return if @ingoing_acknowledged[.type] @ingoing_acknowledged[.type] = true acknowledged_first_ingoing end |
#check_outgoing_acknowledged(message) ⇒ Object
TODO: this might be better handled by a proper event machine using e.g. the EventMachine gem
61 62 63 64 65 66 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 61 def check_outgoing_acknowledged() return if @outgoing_acknowledged[.type] @outgoing_acknowledged[.type] = true acknowledged_first_outgoing end |
#dont_acknowledge(original, prefix = nil, reason = nil, force: true) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 16 def dont_acknowledge(original, prefix = nil, reason = nil, force: true) raise InvalidArgument unless original str = [prefix, reason].join(' ') log str, message: original, level: :warning if reason = MessageNotAck.new({ 'oMId' => original.m_id, 'rea' => reason || 'Unknown reason' }) .original = original.clone , "for #{original.type} #{original.m_id_short}", force: force end |
#dont_expect_acknowledgement(message) ⇒ Object
35 36 37 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 35 def dont_expect_acknowledgement() @awaiting_acknowledgement.delete .attribute('oMId') end |
#expect_acknowledgement(message) ⇒ Object
29 30 31 32 33 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 29 def expect_acknowledgement() return if .is_a?(MessageAck) || .is_a?(MessageNotAck) @awaiting_acknowledgement[.m_id] = end |
#find_original_for_message(message) ⇒ Object
56 57 58 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 56 def () @awaiting_acknowledgement[.attribute('oMId')] end |
#log_acknowledgement_for_original(message, original) ⇒ Object
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 115 def log_acknowledgement_for_original(, original) str = "Received #{message.type} for #{original.type} #{message.attribute('oMId')[0..3]}" if .type == 'MessageNotAck' reason = .attributes['rea'] str = "#{str}: #{reason}" if reason log str, message: , level: :warning else log str, message: , level: :log end end |
#log_acknowledgement_for_unknown(message) ⇒ Object
126 127 128 129 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 126 def log_acknowledgement_for_unknown() log "Received #{message.type} for unknown message #{message.attribute('oMId')[0..3]}", message: , level: :warning end |
#process_ack(message) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 79 def process_ack() original = if original dont_expect_acknowledgement .original = original log_acknowledgement_for_original , original case original.type when 'Version' version_acknowledged when 'StatusSubscribe' status_subscribe_acknowledged original end check_outgoing_acknowledged original @acknowledgements[original.m_id] = @acknowledgement_condition.signal else log_acknowledgement_for_unknown end end |
#process_not_ack(message) ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 102 def process_not_ack() original = if original dont_expect_acknowledgement .original = original log_acknowledgement_for_original , original @acknowledgements[original.m_id] = @acknowledgement_condition.signal else log_acknowledgement_for_unknown end end |
#status_subscribe_acknowledged(original) ⇒ Object
131 132 133 134 135 136 137 138 139 140 |
# File 'lib/rsmp/proxy/modules/acknowledgements.rb', line 131 def status_subscribe_acknowledged(original) component = find_component original.attribute('cId') return unless component short = Message.shorten_m_id original.m_id subscribe_list = original.attributes['sS'] log "StatusSubscribe #{short} acknowledged, allowing repeated status values for #{subscribe_list}", level: :info component.allow_repeat_updates subscribe_list end |