Class: RSMP::Collector
- Inherits:
-
Object
- Object
- RSMP::Collector
- Defined in:
- lib/rsmp/collect/collector.rb,
lib/rsmp/collect/collector/status.rb,
lib/rsmp/collect/collector/logging.rb,
lib/rsmp/collect/collector/reporting.rb
Overview
Collects messages from a distributor. Can filter by message type, componet and direction. Wakes up the once the desired number of messages has been collected.
Direct Known Subclasses
AckCollector, AggregatedStatusCollector, AlarmCollector, StateCollector
Defined Under Namespace
Modules: Logging, Reporting, Status
Instance Attribute Summary collapse
-
#condition ⇒ Object
readonly
Returns the value of attribute condition.
-
#error ⇒ Object
readonly
Returns the value of attribute error.
-
#m_id ⇒ Object
readonly
Returns the value of attribute m_id.
-
#messages ⇒ Object
readonly
Returns the value of attribute messages.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#task ⇒ Object
readonly
Returns the value of attribute task.
Attributes included from Logging
Instance Method Summary collapse
-
#acceptable?(message) ⇒ Boolean
Check a message against our match criteria Return true if there’s a match, false if not.
-
#cancel(error = nil) ⇒ Object
Abort collection.
-
#collect ⇒ Object
Collect message Will return once all messages have been collected, or timeout is reached.
-
#collect! ⇒ Object
Collect message Returns the collected messages, or raise an exception in case of a time out.
-
#complete ⇒ Object
Called when we’re done collecting.
- #describe ⇒ Object
-
#do_stop ⇒ Object
Remove ourself as a receiver, so we don’t receive message notifications anymore, and wake up the async condition.
-
#done? ⇒ Boolean
Have we collected the required number of messages?.
-
#incomplete ⇒ Object
called when we received a message, but are not done yet.
-
#initialize(distributor, options = {}) ⇒ Collector
constructor
A new instance of Collector.
-
#inspect ⇒ Object
Inspect formatter that shows the message we have collected.
-
#keep(message) ⇒ Object
Store a message in the result array.
- #make_title(title) ⇒ Object
-
#ok! ⇒ Object
if an errors caused collection to abort, then raise it return self, so this can be tucked on to calls that return a collector.
-
#perform_match(message) ⇒ Object
Match message against our collection criteria.
-
#receive(message) ⇒ Object
Handle message and return true if we’re done collecting.
-
#receive_disconnect(error, _options) ⇒ Object
Cancel if we received e notifiction about a disconnect.
-
#receive_error(error, options = {}) ⇒ Object
Handle upstream error.
-
#receive_schema_error(error, options) ⇒ Object
Cancel if we received e schema error for a message type we’re collecting.
-
#reject_not_ack(message) ⇒ Object
Check if we receive a NotAck related to initiating request, identified by @m_id.
- #reset ⇒ Object
-
#start(&block) ⇒ Object
Start collection and return immediately You can later use wait() to wait for completion.
- #use_task(task) ⇒ Object
-
#wait ⇒ Object
If collection is not active, return status immeditatly.
-
#wait! ⇒ Object
If collection is not active, raise an error.
Methods included from Logging
#author, #initialize_logging, #log
Methods included from Reporting
#describe_matcher, #describe_num_and_type, #describe_progress, #describe_types, #identifier
Methods included from Status
#cancelled?, #collecting?, #ingoing?, #ok?, #outgoing?, #ready?, #timeout?
Methods included from Receiver
#accept_message?, #handle_message, #initialize_receiver, #reject_message?, #start_receiving, #stop_receiving
Methods included from Inspect
Constructor Details
#initialize(distributor, options = {}) ⇒ Collector
Returns a new instance of Collector.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/rsmp/collect/collector.rb', line 13 def initialize(distributor, = {}) initialize_receiver distributor, filter: [:filter] = { cancel: { schema_error: true, disconnect: false } }.deep_merge @timeout = [:timeout] @num = [:num] @m_id = [:m_id] @condition = Async::Notification.new make_title [:title] if task @task = task elsif distributor.respond_to? 'task' # if distributor is a Proxy, or some other object that implements task(), # then try to get the task that way @task = distributor.task end reset end |
Instance Attribute Details
#condition ⇒ Object (readonly)
Returns the value of attribute condition.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def condition @condition end |
#error ⇒ Object (readonly)
Returns the value of attribute error.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def error @error end |
#m_id ⇒ Object (readonly)
Returns the value of attribute m_id.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def m_id @m_id end |
#messages ⇒ Object (readonly)
Returns the value of attribute messages.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def status @status end |
#task ⇒ Object (readonly)
Returns the value of attribute task.
11 12 13 |
# File 'lib/rsmp/collect/collector.rb', line 11 def task @task end |
Instance Method Details
#acceptable?(message) ⇒ Boolean
Check a message against our match criteria Return true if there’s a match, false if not
246 247 248 |
# File 'lib/rsmp/collect/collector.rb', line 246 def acceptable?() @filter.nil? || @filter.accept?() end |
#cancel(error = nil) ⇒ Object
Abort collection
233 234 235 236 237 |
# File 'lib/rsmp/collect/collector.rb', line 233 def cancel(error = nil) @error = error @status = :cancelled do_stop end |
#collect ⇒ Object
Collect message Will return once all messages have been collected, or timeout is reached
72 73 74 75 76 77 78 |
# File 'lib/rsmp/collect/collector.rb', line 72 def collect(&) start(&) wait @status ensure @distributor&.remove_receiver self end |
#collect! ⇒ Object
Collect message Returns the collected messages, or raise an exception in case of a time out.
82 83 84 85 86 |
# File 'lib/rsmp/collect/collector.rb', line 82 def collect!(&) collect(&) ok! end |
#complete ⇒ Object
Called when we’re done collecting. Remove ourself as a receiver, se we don’t receive message notifications anymore
182 183 184 185 186 |
# File 'lib/rsmp/collect/collector.rb', line 182 def complete @status = :ok do_stop log_complete end |
#describe ⇒ Object
158 |
# File 'lib/rsmp/collect/collector.rb', line 158 def describe; end |
#do_stop ⇒ Object
Remove ourself as a receiver, so we don’t receive message notifications anymore, and wake up the async condition
195 196 197 198 |
# File 'lib/rsmp/collect/collector.rb', line 195 def do_stop @distributor.remove_receiver self @condition.signal end |
#done? ⇒ Boolean
Have we collected the required number of messages?
176 177 178 |
# File 'lib/rsmp/collect/collector.rb', line 176 def done? @num && .size >= @num end |
#incomplete ⇒ Object
called when we received a message, but are not done yet
189 190 191 |
# File 'lib/rsmp/collect/collector.rb', line 189 def incomplete log_incomplete end |
#inspect ⇒ Object
Inspect formatter that shows the message we have collected
58 59 60 |
# File 'lib/rsmp/collect/collector.rb', line 58 def inspect "#<#{self.class.name}:#{object_id}, #{inspector(:@messages)}>" end |
#keep(message) ⇒ Object
Store a message in the result array
240 241 242 |
# File 'lib/rsmp/collect/collector.rb', line 240 def keep() << end |
#make_title(title) ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/rsmp/collect/collector.rb', line 37 def make_title(title) @title = if title title elsif @filter [@filter.type].flatten.join('/') else '' end end |
#ok! ⇒ Object
if an errors caused collection to abort, then raise it return self, so this can be tucked on to calls that return a collector
64 65 66 67 68 |
# File 'lib/rsmp/collect/collector.rb', line 64 def ok! raise @error if @error self end |
#perform_match(message) ⇒ Object
Match message against our collection criteria
161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/rsmp/collect/collector.rb', line 161 def perform_match() return false if reject_not_ack() return false unless acceptable?() if @block status = [@block.call()].flatten return unless collecting? keep if status.include?(:keep) else keep end end |
#receive(message) ⇒ Object
Handle message and return true if we’re done collecting
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/rsmp/collect/collector.rb', line 142 def receive() raise ArgumentError unless unless ready? || collecting? raise "can't process message when status is :#{@status}, title: #{@title}, desc: #{describe}" end if perform_match if done? complete else incomplete end end @status end |
#receive_disconnect(error, _options) ⇒ Object
Cancel if we received e notifiction about a disconnect
225 226 227 228 229 230 |
# File 'lib/rsmp/collect/collector.rb', line 225 def receive_disconnect(error, ) return unless .dig(:cancel, :disconnect) @distributor.log "#{identifier}: cancelled due to a connection error: #{error}", level: :debug cancel error end |
#receive_error(error, options = {}) ⇒ Object
Handle upstream error
201 202 203 204 205 206 207 208 |
# File 'lib/rsmp/collect/collector.rb', line 201 def receive_error(error, = {}) case error when RSMP::SchemaError receive_schema_error error, when RSMP::DisconnectError receive_disconnect error, end end |
#receive_schema_error(error, options) ⇒ Object
Cancel if we received e schema error for a message type we’re collecting
211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/rsmp/collect/collector.rb', line 211 def receive_schema_error(error, ) return unless .dig(:cancel, :schema_error) = [:message] return unless klass = .class.name.split('::').last return unless @filter&.type.nil? || [@filter&.type].flatten.include?(klass) @distributor.log "#{identifier}: cancelled due to schema error in #{klass} #{message.m_id_short}", level: :debug cancel error end |
#reject_not_ack(message) ⇒ Object
Check if we receive a NotAck related to initiating request, identified by @m_id.
129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/rsmp/collect/collector.rb', line 129 def reject_not_ack() return unless @m_id return unless .is_a?(MessageNotAck) return unless .attribute('oMId') == @m_id m_id_short = RSMP::Message.shorten_m_id @m_id, 8 cancel RSMP::MessageRejected.new("#{@title} #{m_id_short} was rejected with '#{message.attribute('rea')}'") @distributor.log "#{identifier}: cancelled due to a NotAck", level: :debug true end |
#reset ⇒ Object
51 52 53 54 55 |
# File 'lib/rsmp/collect/collector.rb', line 51 def reset = [] @error = nil @status = :ready end |
#start(&block) ⇒ Object
Start collection and return immediately You can later use wait() to wait for completion
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/rsmp/collect/collector.rb', line 116 def start(&block) raise "Can't start collectimng unless ready (currently #{@status})" unless ready? @block = block raise ArgumentError, 'Num, timeout or block must be provided' unless @num || @timeout || @block reset @status = :collecting log_start @distributor&.add_receiver self end |
#use_task(task) ⇒ Object
47 48 49 |
# File 'lib/rsmp/collect/collector.rb', line 47 def use_task(task) @task = task end |
#wait ⇒ Object
If collection is not active, return status immeditatly. Otherwise wait until the desired messages have been collected, or timeout is reached.
90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/rsmp/collect/collector.rb', line 90 def wait if collecting? if @timeout @task.with_timeout(@timeout) { @condition.wait } else @condition.wait end end @status rescue Async::TimeoutError @error = RSMP::TimeoutError.new describe_progress @status = :timeout end |
#wait! ⇒ Object
If collection is not active, raise an error. Otherwise wait until the desired messages have been collected. If timeout is reached, an exceptioin is raised.
107 108 109 110 111 112 |
# File 'lib/rsmp/collect/collector.rb', line 107 def wait! wait raise @error if timeout? end |