Class: Asynchronic::Process
- Inherits:
-
Object
- Object
- Asynchronic::Process
- Defined in:
- lib/asynchronic/process.rb
Constant Summary collapse
- STATUSES =
[:pending, :queued, :running, :waiting, :completed, :aborted]
- TIME_TRACKING_MAP =
{ pending: :created_at, queued: :queued_at, running: :started_at, completed: :finalized_at, aborted: :finalized_at }
- ATTRIBUTE_NAMES =
[:type, :name, :queue, :status, :dependencies, :data, :result, :error, :connection_name] | TIME_TRACKING_MAP.values.uniq
- AUTOMATIC_ABORTED_ERROR_MESSAGE =
'Automatic aborted before execution'
- CANCELED_ERROR_MESSAGE =
'Canceled'
- DEAD_ERROR_MESSAGE =
'Process connection broken'
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Class Method Summary collapse
Instance Method Summary collapse
- #[](process_name) ⇒ Object
- #abort_if_dead ⇒ Object
- #cancel! ⇒ Object
- #dead? ⇒ Boolean
- #dependencies ⇒ Object
- #destroy ⇒ Object
- #enqueue ⇒ Object
- #execute ⇒ Object
- #finalized? ⇒ Boolean
- #full_status ⇒ Object
- #get(key) ⇒ Object
-
#initialize(environment, id, &block) ⇒ Process
constructor
A new instance of Process.
- #job ⇒ Object
- #nest(type, params = {}) ⇒ Object
- #params ⇒ Object
- #parent ⇒ Object
- #processes ⇒ Object
- #ready? ⇒ Boolean
- #real_error ⇒ Object
- #result ⇒ Object
- #root ⇒ Object
- #set(key, value) ⇒ Object
- #wakeup ⇒ Object
Constructor Details
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
20 21 22 |
# File 'lib/asynchronic/process.rb', line 20 def id @id end |
Class Method Details
.all(environment) ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/asynchronic/process.rb', line 38 def self.all(environment) environment.data_store.keys .select { |k| k.sections.count == 2 && k.match(/created_at$/) } .sort_by { |k| environment.data_store[k] } .reverse .map { |k| Process.new environment, k.remove_last } end |
.create(environment, type, params = {}) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/asynchronic/process.rb', line 22 def self.create(environment, type, params={}) id = params.delete(:id) || SecureRandom.uuid Asynchronic.logger.debug('Asynchronic') { "Created process #{type} - #{id} - #{params}" } new(environment, id) do self.type = type self.name = (params.delete(:alias) || type).to_s self.queue = params.delete(:queue) || type.queue || parent_queue self.dependencies = Array(params.delete(:dependencies)) | Array(params.delete(:dependency)) | infer_dependencies(params) self.params = params self.data = {} pending! end end |
Instance Method Details
#[](process_name) ⇒ Object
106 107 108 |
# File 'lib/asynchronic/process.rb', line 106 def [](process_name) processes.detect { |p| p.name == process_name.to_s } end |
#abort_if_dead ⇒ Object
80 81 82 |
# File 'lib/asynchronic/process.rb', line 80 def abort_if_dead abort! DEAD_ERROR_MESSAGE if dead? end |
#cancel! ⇒ Object
72 73 74 |
# File 'lib/asynchronic/process.rb', line 72 def cancel! abort! CANCELED_ERROR_MESSAGE end |
#dead? ⇒ Boolean
76 77 78 |
# File 'lib/asynchronic/process.rb', line 76 def dead? (running? && !connected?) || (!finalized? && processes.any?(&:dead?)) end |
#dependencies ⇒ Object
134 135 136 137 138 139 140 141 142 |
# File 'lib/asynchronic/process.rb', line 134 def dependencies return [] if parent.nil? || data_store[:dependencies].empty? parent_processes = parent.processes.each_with_object({}) do |process, hash| hash[process.name] = process end data_store[:dependencies].map { |d| parent_processes[d.to_s] } end |
#destroy ⇒ Object
84 85 86 |
# File 'lib/asynchronic/process.rb', line 84 def destroy data_store.delete_cascade end |
#enqueue ⇒ Object
144 145 146 147 |
# File 'lib/asynchronic/process.rb', line 144 def enqueue queued! environment.enqueue id, queue end |
#execute ⇒ Object
149 150 151 152 153 154 |
# File 'lib/asynchronic/process.rb', line 149 def execute run Asynchronic.retry_execution(self.class, 'wakeup') do wakeup end end |
#finalized? ⇒ Boolean
68 69 70 |
# File 'lib/asynchronic/process.rb', line 68 def finalized? completed? || aborted? end |
#full_status ⇒ Object
88 89 90 91 92 |
# File 'lib/asynchronic/process.rb', line 88 def full_status processes.each_with_object(name => status) do |process, hash| hash.update(process.full_status) end end |
#get(key) ⇒ Object
172 173 174 |
# File 'lib/asynchronic/process.rb', line 172 def get(key) self.data[key] end |
#job ⇒ Object
102 103 104 |
# File 'lib/asynchronic/process.rb', line 102 def job @job ||= type.new self end |
#nest(type, params = {}) ⇒ Object
168 169 170 |
# File 'lib/asynchronic/process.rb', line 168 def nest(type, params={}) self.class.create environment, type, params.merge(id: id[:processes][processes.count]) end |
#params ⇒ Object
94 95 96 |
# File 'lib/asynchronic/process.rb', line 94 def params data_store.scoped(:params).no_lazy.readonly end |
#parent ⇒ Object
118 119 120 |
# File 'lib/asynchronic/process.rb', line 118 def parent Process.new environment, id.remove_last(2) if id.nested? end |
#processes ⇒ Object
110 111 112 113 114 115 116 |
# File 'lib/asynchronic/process.rb', line 110 def processes data_store.scoped(:processes) .keys .select { |k| k.sections.count == 2 && k.match(/\|name$/) } .sort .map { |k| Process.new environment, id[:processes][k.remove_last] } end |
#ready? ⇒ Boolean
64 65 66 |
# File 'lib/asynchronic/process.rb', line 64 def ready? pending? && dependencies.all?(&:completed?) end |
#real_error ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/asynchronic/process.rb', line 126 def real_error return nil if !aborted? first_aborted_child = processes.select(&:aborted?).sort_by(&:finalized_at).first first_aborted_child ? first_aborted_child.real_error : error. end |
#result ⇒ Object
98 99 100 |
# File 'lib/asynchronic/process.rb', line 98 def result data_store.lazy[:result] end |
#root ⇒ Object
122 123 124 |
# File 'lib/asynchronic/process.rb', line 122 def root id.nested? ? Process.new(environment, id.sections.first) : self end |
#set(key, value) ⇒ Object
176 177 178 |
# File 'lib/asynchronic/process.rb', line 176 def set(key, value) self.data = self.data.merge key => value end |
#wakeup ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/asynchronic/process.rb', line 156 def wakeup Asynchronic.logger.info('Asynchronic') { "Wakeup started #{type} (#{id})" } if environment.queue_engine.asynchronic? data_store.synchronize(id) { wakeup_children } else wakeup_children end Asynchronic.logger.info('Asynchronic') { "Wakeup finalized #{type} (#{id})" } parent.wakeup if parent && finalized? end |