Class: Dynflow::Persistence
- Inherits:
-
Object
- Object
- Dynflow::Persistence
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/persistence.rb
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
Instance Method Summary collapse
- #current_backup_dir ⇒ Object
- #delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
- #delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) ⇒ Object
- #delete_output_chunks(execution_plan_id, action_id) ⇒ Object
- #find_execution_plan_counts(options) ⇒ Object
- #find_execution_plan_statuses(options) ⇒ Object
- #find_execution_plans(options) ⇒ Object
- #find_old_execution_plans(age) ⇒ Object
- #find_past_delayed_plans(time) ⇒ Object
-
#initialize(world, persistence_adapter, options = {}) ⇒ Persistence
constructor
A new instance of Persistence.
- #load_action(step) ⇒ Object
- #load_action_for_presentation(execution_plan, action_id, step = nil) ⇒ Object
- #load_actions(execution_plan, action_ids) ⇒ Object
- #load_actions_attributes(execution_plan_id, attributes) ⇒ Object
- #load_delayed_plan(execution_plan_id) ⇒ Object
- #load_execution_plan(id) ⇒ Object
- #load_output_chunks(execution_plan_id, action_id) ⇒ Object
- #load_step(execution_plan_id, step_id, world) ⇒ Object
- #load_steps(execution_plan_id, world) ⇒ Object
- #prune_envelopes(receiver_ids) ⇒ Object
- #prune_undeliverable_envelopes ⇒ Object
- #pull_envelopes(world_id) ⇒ Object
- #push_envelope(envelope) ⇒ Object
- #save_action(execution_plan_id, action) ⇒ Object
- #save_delayed_plan(delayed_plan) ⇒ Object
- #save_execution_plan(execution_plan) ⇒ Object
- #save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
- #save_step(step, conditions = {}) ⇒ Object
- #set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil) ⇒ Object
Constructor Details
#initialize(world, persistence_adapter, options = {}) ⇒ Persistence
12 13 14 15 16 17 18 |
# File 'lib/dynflow/persistence.rb', line 12 def initialize(world, persistence_adapter, = {}) @world = world @adapter = persistence_adapter @adapter.register_world(world) @backup_deleted_plans = .fetch(:backup_deleted_plans, false) @backup_dir = .fetch(:backup_dir, './backup') end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
10 11 12 |
# File 'lib/dynflow/persistence.rb', line 10 def adapter @adapter end |
Instance Method Details
#current_backup_dir ⇒ Object
82 83 84 |
# File 'lib/dynflow/persistence.rb', line 82 def current_backup_dir @backup_deleted_plans ? File.join(@backup_dir, Date.today.strftime('%Y%m%d')) : nil end |
#delete_delayed_plans(filters, batch_size = 1000) ⇒ Object
107 108 109 |
# File 'lib/dynflow/persistence.rb', line 107 def delete_delayed_plans(filters, batch_size = 1000) adapter.delete_delayed_plans(filters, batch_size) end |
#delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) ⇒ Object
77 78 79 80 |
# File 'lib/dynflow/persistence.rb', line 77 def delete_execution_plans(filters, batch_size = 1000, enforce_backup_dir = nil) backup_dir = enforce_backup_dir || current_backup_dir adapter.delete_execution_plans(filters, batch_size, backup_dir) end |
#delete_output_chunks(execution_plan_id, action_id) ⇒ Object
59 60 61 |
# File 'lib/dynflow/persistence.rb', line 59 def delete_output_chunks(execution_plan_id, action_id) adapter.delete_output_chunks(execution_plan_id, action_id) end |
#find_execution_plan_counts(options) ⇒ Object
73 74 75 |
# File 'lib/dynflow/persistence.rb', line 73 def find_execution_plan_counts() adapter.find_execution_plan_counts() end |
#find_execution_plan_statuses(options) ⇒ Object
69 70 71 |
# File 'lib/dynflow/persistence.rb', line 69 def find_execution_plan_statuses() adapter.find_execution_plan_statuses() end |
#find_execution_plans(options) ⇒ Object
63 64 65 66 67 |
# File 'lib/dynflow/persistence.rb', line 63 def find_execution_plans() adapter.find_execution_plans().map do |execution_plan_hash| ExecutionPlan.new_from_hash(execution_plan_hash, @world) end end |
#find_old_execution_plans(age) ⇒ Object
95 96 97 98 99 |
# File 'lib/dynflow/persistence.rb', line 95 def find_old_execution_plans(age) adapter.find_old_execution_plans(age).map do |plan| ExecutionPlan.new_from_hash(plan, @world) end end |
#find_past_delayed_plans(time) ⇒ Object
101 102 103 104 105 |
# File 'lib/dynflow/persistence.rb', line 101 def find_past_delayed_plans(time) adapter.find_past_delayed_plans(time).map do |plan| DelayedPlan.new_from_hash(@world, plan) end end |
#load_action(step) ⇒ Object
20 21 22 23 24 25 |
# File 'lib/dynflow/persistence.rb', line 20 def load_action(step) attributes = adapter. load_action(step.execution_plan_id, step.action_id). update(step: step, phase: step.phase) return Action.from_hash(attributes, step.world) end |
#load_action_for_presentation(execution_plan, action_id, step = nil) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/dynflow/persistence.rb', line 34 def load_action_for_presentation(execution_plan, action_id, step = nil) attributes = adapter.load_action(execution_plan.id, action_id) Action.from_hash(attributes.update(phase: Action::Present, execution_plan: execution_plan, step: step), @world).tap do |present_action| @world.middleware.execute(:present, present_action) {} end end |
#load_actions(execution_plan, action_ids) ⇒ Object
27 28 29 30 31 32 |
# File 'lib/dynflow/persistence.rb', line 27 def load_actions(execution_plan, action_ids) attributes = adapter.load_actions(execution_plan.id, action_ids) attributes.map do |action| Action.from_hash(action.merge(phase: Action::Present, execution_plan: execution_plan), @world) end end |
#load_actions_attributes(execution_plan_id, attributes) ⇒ Object
41 42 43 |
# File 'lib/dynflow/persistence.rb', line 41 def load_actions_attributes(execution_plan_id, attributes) adapter.load_actions_attributes(execution_plan_id, attributes).reject(&:empty?) end |
#load_delayed_plan(execution_plan_id) ⇒ Object
122 123 124 125 126 |
# File 'lib/dynflow/persistence.rb', line 122 def load_delayed_plan(execution_plan_id) hash = adapter.load_delayed_plan(execution_plan_id) return nil unless hash DelayedPlan.new_from_hash(@world, hash) end |
#load_execution_plan(id) ⇒ Object
86 87 88 89 |
# File 'lib/dynflow/persistence.rb', line 86 def load_execution_plan(id) execution_plan_hash = adapter.load_execution_plan(id) ExecutionPlan.new_from_hash(execution_plan_hash, @world) end |
#load_output_chunks(execution_plan_id, action_id) ⇒ Object
55 56 57 |
# File 'lib/dynflow/persistence.rb', line 55 def load_output_chunks(execution_plan_id, action_id) adapter.load_output_chunks(execution_plan_id, action_id) end |
#load_step(execution_plan_id, step_id, world) ⇒ Object
128 129 130 131 |
# File 'lib/dynflow/persistence.rb', line 128 def load_step(execution_plan_id, step_id, world) step_hash = adapter.load_step(execution_plan_id, step_id) ExecutionPlan::Steps::Abstract.from_hash(step_hash, execution_plan_id, world) end |
#load_steps(execution_plan_id, world) ⇒ Object
133 134 135 136 137 |
# File 'lib/dynflow/persistence.rb', line 133 def load_steps(execution_plan_id, world) adapter.load_steps(execution_plan_id).map do |step_hash| ExecutionPlan::Steps::Abstract.from_hash(step_hash, execution_plan_id, world) end end |
#prune_envelopes(receiver_ids) ⇒ Object
156 157 158 |
# File 'lib/dynflow/persistence.rb', line 156 def prune_envelopes(receiver_ids) adapter.prune_envelopes(receiver_ids) end |
#prune_undeliverable_envelopes ⇒ Object
160 161 162 |
# File 'lib/dynflow/persistence.rb', line 160 def prune_undeliverable_envelopes adapter.prune_undeliverable_envelopes end |
#pull_envelopes(world_id) ⇒ Object
148 149 150 151 152 153 154 |
# File 'lib/dynflow/persistence.rb', line 148 def pull_envelopes(world_id) adapter.pull_envelopes(world_id).map do |data| envelope = Dynflow.serializer.load(data) Type! envelope, Dispatcher::Envelope envelope end end |
#push_envelope(envelope) ⇒ Object
143 144 145 146 |
# File 'lib/dynflow/persistence.rb', line 143 def push_envelope(envelope) Type! envelope, Dispatcher::Envelope adapter.push_envelope(Dynflow.serializer.dump(envelope)) end |
#save_action(execution_plan_id, action) ⇒ Object
45 46 47 |
# File 'lib/dynflow/persistence.rb', line 45 def save_action(execution_plan_id, action) adapter.save_action(execution_plan_id, action.id, action.to_hash) end |
#save_delayed_plan(delayed_plan) ⇒ Object
111 112 113 |
# File 'lib/dynflow/persistence.rb', line 111 def save_delayed_plan(delayed_plan) adapter.save_delayed_plan(delayed_plan.execution_plan_uuid, delayed_plan.to_hash) end |
#save_execution_plan(execution_plan) ⇒ Object
91 92 93 |
# File 'lib/dynflow/persistence.rb', line 91 def save_execution_plan(execution_plan) adapter.save_execution_plan(execution_plan.id, execution_plan.to_hash) end |
#save_output_chunks(execution_plan_id, action_id, chunks) ⇒ Object
49 50 51 52 53 |
# File 'lib/dynflow/persistence.rb', line 49 def save_output_chunks(execution_plan_id, action_id, chunks) return if chunks.empty? adapter.save_output_chunks(execution_plan_id, action_id, chunks) end |
#save_step(step, conditions = {}) ⇒ Object
139 140 141 |
# File 'lib/dynflow/persistence.rb', line 139 def save_step(step, conditions = {}) adapter.save_step(step.execution_plan_id, step.id, step.to_hash, conditions) end |
#set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil) ⇒ Object
115 116 117 118 119 120 |
# File 'lib/dynflow/persistence.rb', line 115 def set_delayed_plan_frozen(execution_plan_id, frozen = true, new_start_at = nil) plan = load_delayed_plan(execution_plan_id) plan.frozen = frozen plan.start_at = new_start_at if new_start_at save_delayed_plan(plan) end |