Class: Asynchronic::Process
- Inherits:
-
Object
- Object
- Asynchronic::Process
- Defined in:
- lib/asynchronic/process.rb
Constant Summary collapse
- STATUSES =
[:pending, :queued, :running, :waiting, :completed, :aborted]
- TIME_TRACKING_MAP =
{ pending: :created_at, queued: :queued_at, running: :started_at, completed: :finalized_at, aborted: :finalized_at }
- ATTRIBUTE_NAMES =
[:type, :name, :queue, :status, :dependencies, :data, :result, :error] | TIME_TRACKING_MAP.values.uniq
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Class Method Summary collapse
Instance Method Summary collapse
- #[](process_name) ⇒ Object
- #dependencies ⇒ Object
- #enqueue ⇒ Object
- #execute ⇒ Object
- #finalized? ⇒ Boolean
-
#initialize(environment, id, &block) ⇒ Process
constructor
A new instance of Process.
- #job ⇒ Object
- #nest(type, params = {}) ⇒ Object
- #params ⇒ Object
- #parent ⇒ Object
- #processes ⇒ Object
- #ready? ⇒ Boolean
- #result ⇒ Object
- #set(key, value) ⇒ Object
- #wakeup ⇒ Object
Constructor Details
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
16 17 18 |
# File 'lib/asynchronic/process.rb', line 16 def id @id end |
Class Method Details
.all(environment) ⇒ Object
123 124 125 126 127 128 |
# File 'lib/asynchronic/process.rb', line 123 def self.all(environment) environment.data_store.keys. select { |k| k.sections.count == 2 && k.match(/created_at$/) }. sort_by { |k| environment.data_store[k] }.reverse. map { |k| Process.new environment, k.remove_last } end |
.create(environment, type, params = {}) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/asynchronic/process.rb', line 107 def self.create(environment, type, params={}) id = params.delete(:id) || SecureRandom.uuid Asynchronic.logger.debug('Asynchronic') { "Created process #{type} - #{id} - #{params}" } new(environment, id) do self.type = type self.name = params.delete(:alias) || type self.queue = params.delete(:queue) || type.queue || parent_queue self.dependencies = Array(params.delete(:dependencies)) | Array(params.delete(:dependency)) | infer_dependencies(params) self.params = params self.data = {} pending! end end |
Instance Method Details
#[](process_name) ⇒ Object
56 57 58 |
# File 'lib/asynchronic/process.rb', line 56 def [](process_name) processes.detect { |p| p.name == process_name } end |
#dependencies ⇒ Object
70 71 72 73 |
# File 'lib/asynchronic/process.rb', line 70 def dependencies return [] unless parent data_store[:dependencies].map { |d| parent[d] } end |
#enqueue ⇒ Object
75 76 77 78 |
# File 'lib/asynchronic/process.rb', line 75 def enqueue queued! environment.enqueue id, queue end |
#execute ⇒ Object
80 81 82 83 84 85 |
# File 'lib/asynchronic/process.rb', line 80 def execute run Asynchronic.retry_execution(self.class, 'wakeup') do wakeup end end |
#finalized? ⇒ Boolean
40 41 42 |
# File 'lib/asynchronic/process.rb', line 40 def finalized? completed? || aborted? end |
#job ⇒ Object
52 53 54 |
# File 'lib/asynchronic/process.rb', line 52 def job type.new self end |
#nest(type, params = {}) ⇒ Object
99 100 101 |
# File 'lib/asynchronic/process.rb', line 99 def nest(type, params={}) self.class.create @environment, type, params.merge(id: id[:processes][processes.count]) end |
#params ⇒ Object
44 45 46 |
# File 'lib/asynchronic/process.rb', line 44 def params data_store.scoped(:params).no_lazy.readonly end |
#parent ⇒ Object
66 67 68 |
# File 'lib/asynchronic/process.rb', line 66 def parent Process.new environment, id.remove_last(2) if id.nested? end |
#processes ⇒ Object
60 61 62 63 64 |
# File 'lib/asynchronic/process.rb', line 60 def processes data_store.scoped(:processes).keys. select { |k| k.sections.count == 2 && k.match(/name$/) }. sort.map { |k| Process.new environment, id[:processes][k.remove_last] } end |
#ready? ⇒ Boolean
36 37 38 |
# File 'lib/asynchronic/process.rb', line 36 def ready? pending? && dependencies.all?(&:completed?) end |
#result ⇒ Object
48 49 50 |
# File 'lib/asynchronic/process.rb', line 48 def result data_store.lazy[:result] end |
#set(key, value) ⇒ Object
103 104 105 |
# File 'lib/asynchronic/process.rb', line 103 def set(key, value) self.data = self.data.merge key => value end |
#wakeup ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/asynchronic/process.rb', line 87 def wakeup Asynchronic.logger.info('Asynchronic') { "Wakeup started #{type} (#{id})" } if environment.queue_engine.asynchronic? data_store.synchronize(id) { wakeup_children } else wakeup_children end Asynchronic.logger.info('Asynchronic') { "Wakeup finalized #{type} (#{id})" } parent.wakeup if parent && finalized? end |