Class: Roby::DRoby::PlanRebuilder

Inherits:
Object
  • Object
show all
Defined in:
lib/roby/droby/plan_rebuilder.rb

Overview

This class rebuilds a plan structure from events saved by EventLogger

The data has to be fed cycle-by-cycle to the #process_one_cycle method

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(plan: RebuiltPlan.new) ⇒ PlanRebuilder

Returns a new instance of PlanRebuilder.



42
43
44
45
46
47
48
49
50
# File 'lib/roby/droby/plan_rebuilder.rb', line 42

def initialize(plan: RebuiltPlan.new)
    @plan = plan
    @object_manager = ObjectManager.new(DRobyID.allocate)
    @marshal = Marshal.new(object_manager, nil)

    @scheduler_state = Schedulers::State.new
    clear_changes
    @stats = {}
end

Instance Attribute Details

#changesObject (readonly)

A hash that stores (at a high level) what changed since the last call to #clear_integrated

Don’t manipulate directly, but use the announce_* and has_*_changes? methods



32
33
34
# File 'lib/roby/droby/plan_rebuilder.rb', line 32

def changes
  @changes
end

#current_timeObject (readonly)

The time of the last processed log item



40
41
42
# File 'lib/roby/droby/plan_rebuilder.rb', line 40

def current_time
  @current_time
end

#event_filtersObject (readonly)

A set of EventFilter objects that list the labelling objects / filters applied on the event stream



35
36
37
# File 'lib/roby/droby/plan_rebuilder.rb', line 35

def event_filters
  @event_filters
end

#marshalObject (readonly)

The object that unmarshals the data



15
16
17
# File 'lib/roby/droby/plan_rebuilder.rb', line 15

def marshal
  @marshal
end

#object_managerObject (readonly)

The object that does ID-to-object mapping



13
14
15
# File 'lib/roby/droby/plan_rebuilder.rb', line 13

def object_manager
  @object_manager
end

#planObject (readonly)

The Plan object into which we rebuild information



18
19
20
# File 'lib/roby/droby/plan_rebuilder.rb', line 18

def plan
  @plan
end

#scheduler_stateObject (readonly)

The scheduler state for the current cycle



21
22
23
# File 'lib/roby/droby/plan_rebuilder.rb', line 21

def scheduler_state
  @scheduler_state
end

#start_timeObject (readonly)

The time of the first processed cycle



38
39
40
# File 'lib/roby/droby/plan_rebuilder.rb', line 38

def start_time
  @start_time
end

#stateObject (readonly)

A representation of the state for this execution cycle



26
27
28
# File 'lib/roby/droby/plan_rebuilder.rb', line 26

def state
  @state
end

#statsObject (readonly)

A hash representing the statistics for this execution cycle



24
25
26
# File 'lib/roby/droby/plan_rebuilder.rb', line 24

def stats
  @stats
end

Class Method Details

.update_type(type) ⇒ Object



160
161
162
163
164
165
166
167
# File 'lib/roby/droby/plan_rebuilder.rb', line 160

def self.update_type(type)
    define_method("announce_#{type}_update") do
        @changes[type] = true
    end
    define_method("has_#{type}_updates?") do
        !!@changes[type]
    end
end

Instance Method Details

#added_edge(time, parent, child, relations, info) ⇒ Object



202
203
204
205
206
207
208
209
210
# File 'lib/roby/droby/plan_rebuilder.rb', line 202

def added_edge(time, parent, child, relations, info)
    parent = local_object(parent)
    child  = local_object(child)
    rel    = local_object(relations.first)
    info   = local_object(info)
    g = parent.relation_graph_for(rel)
    g.add_edge(parent, child, info)
    [parent, child, rel, info]
end

#analyze_stream(event_stream, until_cycle = nil) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/roby/droby/plan_rebuilder.rb', line 52

def analyze_stream(event_stream, until_cycle = nil)
    while !event_stream.eof? && (!until_cycle || (cycle_index && cycle_index == until_cycle))
        begin
            data = event_stream.read
            interesting = process(data)
            if block_given?
                interesting = yield
            end

            if interesting
                relations = if !has_structure_updates? && !history.empty?
                                history.last.relations
                            end

                history << snapshot(relations)
            end
        ensure
            clear_integrated
        end
    end
end

#apply_at_cycle_endObject



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/roby/droby/plan_rebuilder.rb', line 285

def apply_at_cycle_end
    plan.at_cycle_end.remove_free_event.each do |ev|
        plan.remove_free_event(ev)
    end
    plan.at_cycle_end.remove_task.each do |task|
        plan.remove_task(task)
    end
    plan.at_cycle_end.deregister_object.each do |obj|
        object_manager.deregister_object(obj)
    end
    unless plan.at_cycle_end.remove_free_event.empty? &&
           plan.at_cycle_end.remove_task.empty?
        announce_structure_update
    end
    plan.at_cycle_end.clear
end

#clearObject



99
100
101
102
103
# File 'lib/roby/droby/plan_rebuilder.rb', line 99

def clear
    plan.clear
    object_manager.clear
    @scheduler_state = Schedulers::State.new
end

#clear_changesObject



153
154
155
156
157
158
# File 'lib/roby/droby/plan_rebuilder.rb', line 153

def clear_changes
    @changes = Hash[
        state: false,
        structure: false,
        event_propagation: false]
end

#clear_integratedObject



144
145
146
147
148
149
150
151
# File 'lib/roby/droby/plan_rebuilder.rb', line 144

def clear_integrated
    clear_changes
    if !plan.garbaged_tasks.empty? || !plan.garbaged_events.empty?
        announce_structure_update
        announce_state_update
    end
    plan.clear_integrated
end

#cycle_end(time, timings) ⇒ Object



402
403
404
405
406
407
408
409
410
# File 'lib/roby/droby/plan_rebuilder.rb', line 402

def cycle_end(time, timings)
    plan.scheduler_states << scheduler_state
    @scheduler_state = Schedulers::State.new
    @state = timings.delete(:state)
    @stats = timings
    @start_time ||= self.cycle_start_time
    apply_at_cycle_end
    announce_state_update
end

#cycle_end_timeObject

The starting time of the last processed cycle



84
85
86
# File 'lib/roby/droby/plan_rebuilder.rb', line 84

def cycle_end_time
    Time.at(*stats[:start]) + stats[:end]
end

#cycle_indexObject

The cycle index of the last processed cycle



89
90
91
# File 'lib/roby/droby/plan_rebuilder.rb', line 89

def cycle_index
    stats[:cycle_index]
end

#cycle_start_timeTime

The starting time of the last processed cycle

Returns:

  • (Time)


77
78
79
80
81
# File 'lib/roby/droby/plan_rebuilder.rb', line 77

def cycle_start_time
    if stats[:start] && stats[:actual_start]
        Time.at(*stats[:start]) + stats[:actual_start]
    end
end

#event_status_change(time, event, status) ⇒ Object



245
246
247
248
249
250
251
252
253
254
# File 'lib/roby/droby/plan_rebuilder.rb', line 245

def event_status_change(time, event, status)
    event = local_object(event)
    case status
    when :normal
        plan.unmark_permanent_event(event)
    when :permanent
        plan.add_permanent_event(event)
    end
    event
end

#exception_notification(time, plan_id, mode, error, involved_objects) ⇒ Object



367
368
369
370
371
372
373
374
# File 'lib/roby/droby/plan_rebuilder.rb', line 367

def exception_notification(time, plan_id, mode, error, involved_objects)
    error = local_object(error)
    involved_objects = local_object(involved_objects)
    plan = local_object(plan_id)
    plan.propagated_exceptions <<
        [time, mode, error, involved_objects]
    [plan, error, involved_objects]
end

#finalized_event(time, plan_id, event) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/roby/droby/plan_rebuilder.rb', line 272

def finalized_event(time, plan_id, event)
    plan  = local_object(plan_id)
    event = local_object(event)
    event.finalization_time = time
    if !plan.garbaged_events.include?(event) && event.root_object?
        plan.finalized_events << event
        plan.at_cycle_end.remove_free_event << event
    end

    plan.at_cycle_end.deregister_object << event
    [plan, event]
end

#finalized_task(time, plan_id, task) ⇒ Object



302
303
304
305
306
307
308
309
310
311
312
# File 'lib/roby/droby/plan_rebuilder.rb', line 302

def finalized_task(time, plan_id, task)
    plan = local_object(plan_id)
    task = local_object(task)
    task.finalization_time = time
    unless plan.garbaged_tasks.include?(task)
        plan.finalized_tasks << task
        plan.at_cycle_end.remove_task << task
    end
    plan.at_cycle_end.deregister_object << task
    [plan, task]
end

#garbage_event(time, plan, event) ⇒ Object



265
266
267
268
269
270
# File 'lib/roby/droby/plan_rebuilder.rb', line 265

def garbage_event(time, plan, event)
    plan = local_object(plan)
    event = local_object(event)
    plan.garbaged_events << event
    event
end

#garbage_task(time, plan, task, can_finalize) ⇒ Object



256
257
258
259
260
261
262
263
# File 'lib/roby/droby/plan_rebuilder.rb', line 256

def garbage_task(time, plan, task, can_finalize)
    plan = local_object(plan)
    task = local_object(task)
    if can_finalize
        plan.garbaged_tasks << task
    end
    task
end

#generator_emit_failed(time, generator, error) ⇒ Object



344
345
346
347
348
349
350
# File 'lib/roby/droby/plan_rebuilder.rb', line 344

def generator_emit_failed(time, generator, error)
    generator = local_object(generator)
    error = local_object(error)
    generator.plan.failed_emissions << [time, generator, error]
    announce_event_propagation_update
    [generator, error]
end

#generator_fired(time, event) ⇒ Object



330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/roby/droby/plan_rebuilder.rb', line 330

def generator_fired(time, event)
    event     = local_object(event)
    generator = event.generator

    generator.history << event
    generator.instance_eval { @emitted = true }
    if generator.respond_to?(:task)
        generator.task.fired_event(event)
    end
    generator.plan.emitted_events << [time, event]
    announce_event_propagation_update
    event
end

#generator_propagate_events(time, is_forwarding, events, generator) ⇒ Object



352
353
354
355
356
357
358
# File 'lib/roby/droby/plan_rebuilder.rb', line 352

def generator_propagate_events(time, is_forwarding, events, generator)
    events    = local_object(events)
    generator = local_object(generator)
    generator.plan.propagated_events << [time, is_forwarding, events, generator]
    announce_event_propagation_update
    [events, generator]
end

#generator_unreachable(time, generator, reason) ⇒ Object



360
361
362
363
364
365
# File 'lib/roby/droby/plan_rebuilder.rb', line 360

def generator_unreachable(time, generator, reason)
    generator = local_object(generator)
    reason    = local_object(reason)
    generator.mark_unreachable!(reason)
    [generator, reason]
end

#has_interesting_events?Boolean

True if there are stuff recorded in the last played cycles that demand a snapshot to be created

Returns:

  • (Boolean)


95
96
97
# File 'lib/roby/droby/plan_rebuilder.rb', line 95

def has_interesting_events?
    has_structure_updates? || has_event_propagation_updates?
end

#local_object(object) ⇒ Object



140
141
142
# File 'lib/roby/droby/plan_rebuilder.rb', line 140

def local_object(object)
    marshal.local_object(object)
end

#merged_plan(time, plan_id, merged_plan) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/roby/droby/plan_rebuilder.rb', line 187

def merged_plan(time, plan_id, merged_plan)
    merged_plan = local_object(merged_plan)
    tasks_and_events =
        merged_plan.tasks.to_a +
        merged_plan.free_events.to_a +
        merged_plan.task_events.to_a

    plan = local_object(plan_id)
    plan.merge(merged_plan)
    tasks_and_events.each do |obj|
        obj.addition_time = time
    end
    [plan, merged_plan]
end

#pretty_print(pp) ⇒ Object



418
419
420
421
422
423
424
# File 'lib/roby/droby/plan_rebuilder.rb', line 418

def pretty_print(pp)
    pp.text "Plan rebuilder state for #{plan}"
    pp.nest(2) do
        pp.breakable
        object_manager.pretty_print(pp)
    end
end

#process_one_cycle(data) ⇒ Object

Processes one cycle worth of data coming from an EventStream, updating the plan

It returns true if there was something noteworthy in there, and false otherwise.



110
111
112
113
114
# File 'lib/roby/droby/plan_rebuilder.rb', line 110

def process_one_cycle(data)
    data.each_slice(4) do |m, sec, usec, args|
        process_one_event(m, sec, usec, args)
    end
end

#process_one_event(m, sec, usec, args) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/roby/droby/plan_rebuilder.rb', line 116

def process_one_event(m, sec, usec, args)
    time = Time.at(sec, usec)
    @current_time = time

    begin
        if respond_to?(m)
            send(m, time, *args)
        end
    rescue Interrupt
        raise
    rescue Exception => e
        display_args = args.map do |obj|
            case obj
            when NilClass then "nil"
            when Time then obj.to_hms
            else (obj.to_s rescue "failed_to_s")
            end
        end

        raise e, "#{e.message} while serving #{m}(#{display_args.join(', ')})", e.backtrace
    end
    nil
end

#register_executable_plan(time, plan_id) ⇒ Object



181
182
183
184
185
# File 'lib/roby/droby/plan_rebuilder.rb', line 181

def register_executable_plan(time, plan_id)
    @plan = RebuiltPlan.new
    object_manager.register_object(plan, nil => plan_id)
    @plan
end

#removed_edge(time, parent, child, relations) ⇒ Object



222
223
224
225
226
227
228
229
# File 'lib/roby/droby/plan_rebuilder.rb', line 222

def removed_edge(time, parent, child, relations)
    parent = local_object(parent)
    child  = local_object(child)
    rel    = local_object(relations.first)
    g = parent.relation_graph_for(rel)
    g.remove_edge(parent, child)
    [parent, child, rel]
end

#scheduler_report_action(time, msg, task, *args) ⇒ Object



395
396
397
398
399
400
# File 'lib/roby/droby/plan_rebuilder.rb', line 395

def scheduler_report_action(time, msg, task, *args)
    task = local_object(task)
    args = local_object(args)
    scheduler_state.report_action(msg, task, *args)
    [msg, task, args]
end

#scheduler_report_holdoff(time, msg, task, *args) ⇒ Object



388
389
390
391
392
393
# File 'lib/roby/droby/plan_rebuilder.rb', line 388

def scheduler_report_holdoff(time, msg, task, *args)
    task = local_object(task)
    args = local_object(args)
    scheduler_state.report_holdoff(msg, task, *args)
    [msg, task, *args]
end

#scheduler_report_pending_non_executable_task(time, msg, *args) ⇒ Object



376
377
378
379
380
# File 'lib/roby/droby/plan_rebuilder.rb', line 376

def scheduler_report_pending_non_executable_task(time, msg, *args)
    args = local_object(args)
    scheduler_state.report_pending_non_executable_task(msg, *args)
    [msg, *args]
end

#scheduler_report_trigger(time, generator) ⇒ Object



382
383
384
385
386
# File 'lib/roby/droby/plan_rebuilder.rb', line 382

def scheduler_report_trigger(time, generator)
    generator = local_object(generator)
    scheduler_state.report_trigger(generator)
    generator
end

#task_arguments_updated(time, task, key, value) ⇒ Object



314
315
316
317
318
319
# File 'lib/roby/droby/plan_rebuilder.rb', line 314

def task_arguments_updated(time, task, key, value)
    task  = local_object(task)
    value = local_object(value)
    task.arguments.force_merge!(key => value)
    [task, value]
end

#task_failed_to_start(time, task, reason) ⇒ Object



321
322
323
324
325
326
327
328
# File 'lib/roby/droby/plan_rebuilder.rb', line 321

def task_failed_to_start(time, task, reason)
    task   = local_object(task)
    reason = local_object(reason)
    task.plan.failed_to_start << [time, task, reason]
    task.mark_failed_to_start(reason, time)
    announce_event_propagation_update
    [task, reason]
end

#task_status_change(time, task, status) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/roby/droby/plan_rebuilder.rb', line 231

def task_status_change(time, task, status)
    task = local_object(task)
    case status
    when :normal
        plan.unmark_mission_task(task)
        plan.unmark_permanent_task(task)
    when :permanent
        plan.add_permanent_task(task)
    when :mission
        plan.add_mission_task(task)
    end
    task
end

#timepoint(time) ⇒ Object



416
# File 'lib/roby/droby/plan_rebuilder.rb', line 416

def timepoint(time, *); end

#timepoint_group_end(time) ⇒ Object



414
# File 'lib/roby/droby/plan_rebuilder.rb', line 414

def timepoint_group_end(time, *); end

#timepoint_group_start(time) ⇒ Object



412
# File 'lib/roby/droby/plan_rebuilder.rb', line 412

def timepoint_group_start(time, *); end

#updated_edge_info(time, parent, child, relation, info) ⇒ Object



212
213
214
215
216
217
218
219
220
# File 'lib/roby/droby/plan_rebuilder.rb', line 212

def updated_edge_info(time, parent, child, relation, info)
    parent = local_object(parent)
    child  = local_object(child)
    rel    = local_object(relation)
    info   = local_object(info)
    g = parent.relation_graph_for(rel)
    g.set_edge_info(parent, child, info)
    [parent, child, rel, info]
end