Class: Asynchronic::Process

Inherits:
Object
  • Object
show all
Defined in:
lib/asynchronic/process.rb

Constant Summary collapse

STATUSES =
[:pending, :queued, :running, :waiting, :completed, :aborted]
TIME_TRACKING_MAP =
{
  pending:   :created_at,
  queued:    :queued_at,
  running:   :started_at,
  completed: :finalized_at,
  aborted:   :finalized_at
}
ATTRIBUTE_NAMES =
[:type, :name, :queue, :status, :dependencies, :data, :result, :error, :connection_name] | TIME_TRACKING_MAP.values.uniq
AUTOMATIC_ABORTED_ERROR_MESSAGE =
'Automatic aborted before execution'
CANCELED_ERROR_MESSAGE =
'Canceled'
DEAD_ERROR_MESSAGE =
'Process connection broken'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(environment, id, &block) ⇒ Process

Returns a new instance of Process.



46
47
48
49
50
# File 'lib/asynchronic/process.rb', line 46

def initialize(environment, id, &block)
  @environment = environment
  @id = DataStore::Key[id]
  instance_eval(&block) if block_given?
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



20
21
22
# File 'lib/asynchronic/process.rb', line 20

def id
  @id
end

Class Method Details

.all(environment) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/asynchronic/process.rb', line 38

def self.all(environment)
  environment.data_store.keys
                        .select { |k| k.sections.count == 2 && k.match(/created_at$/) }
                        .sort_by { |k| environment.data_store[k] }
                        .reverse
                        .map { |k| Process.new environment, k.remove_last }
end

.create(environment, type, params = {}) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/asynchronic/process.rb', line 22

def self.create(environment, type, params={})
  id = params.delete(:id) || SecureRandom.uuid

  Asynchronic.logger.debug('Asynchronic') { "Created process #{type} - #{id} - #{params}" }

  new(environment, id) do
    self.type = type
    self.name = (params.delete(:alias) || type).to_s
    self.queue = params.delete(:queue) || type.queue || parent_queue
    self.dependencies = Array(params.delete(:dependencies)) | Array(params.delete(:dependency)) | infer_dependencies(params)
    self.params = params
    self.data = {}
    pending!
  end
end

Instance Method Details

#[](process_name) ⇒ Object



106
107
108
# File 'lib/asynchronic/process.rb', line 106

def [](process_name)
  processes.detect { |p| p.name == process_name.to_s }
end

#abort_if_deadObject



80
81
82
# File 'lib/asynchronic/process.rb', line 80

def abort_if_dead
  abort! DEAD_ERROR_MESSAGE if dead?
end

#cancel!Object



72
73
74
# File 'lib/asynchronic/process.rb', line 72

def cancel!
  abort! CANCELED_ERROR_MESSAGE
end

#dead?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/asynchronic/process.rb', line 76

def dead?
  (running? && !connected?) || (!finalized? && processes.any?(&:dead?))
end

#dependenciesObject



134
135
136
137
138
139
140
141
142
# File 'lib/asynchronic/process.rb', line 134

def dependencies
  return [] if parent.nil? || data_store[:dependencies].empty?

  parent_processes = parent.processes.each_with_object({}) do |process, hash|
    hash[process.name] = process
  end

  data_store[:dependencies].map { |d| parent_processes[d.to_s] }
end

#destroyObject



84
85
86
# File 'lib/asynchronic/process.rb', line 84

def destroy
  data_store.delete_cascade
end

#enqueueObject



144
145
146
147
# File 'lib/asynchronic/process.rb', line 144

def enqueue
  queued!
  environment.enqueue id, queue
end

#executeObject



149
150
151
152
153
154
# File 'lib/asynchronic/process.rb', line 149

def execute
  run
  Asynchronic.retry_execution(self.class, 'wakeup') do
    wakeup
  end
end

#finalized?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/asynchronic/process.rb', line 68

def finalized?
  completed? || aborted?
end

#full_statusObject



88
89
90
91
92
# File 'lib/asynchronic/process.rb', line 88

def full_status
  processes.each_with_object(name => status) do |process, hash|
    hash.update(process.full_status)
  end
end

#get(key) ⇒ Object



172
173
174
# File 'lib/asynchronic/process.rb', line 172

def get(key)
  self.data[key]
end

#jobObject



102
103
104
# File 'lib/asynchronic/process.rb', line 102

def job
  @job ||= type.new self
end

#nest(type, params = {}) ⇒ Object



168
169
170
# File 'lib/asynchronic/process.rb', line 168

def nest(type, params={})
  self.class.create environment, type, params.merge(id: id[:processes][processes.count])
end

#paramsObject



94
95
96
# File 'lib/asynchronic/process.rb', line 94

def params
  data_store.scoped(:params).no_lazy.readonly
end

#parentObject



118
119
120
# File 'lib/asynchronic/process.rb', line 118

def parent
  Process.new environment, id.remove_last(2) if id.nested?
end

#processesObject



110
111
112
113
114
115
116
# File 'lib/asynchronic/process.rb', line 110

def processes
  data_store.scoped(:processes)
            .keys
            .select { |k| k.sections.count == 2 && k.match(/\|name$/) }
            .sort
            .map { |k| Process.new environment, id[:processes][k.remove_last] }
end

#ready?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/asynchronic/process.rb', line 64

def ready?
  pending? && dependencies.all?(&:completed?)
end

#real_errorObject



126
127
128
129
130
131
132
# File 'lib/asynchronic/process.rb', line 126

def real_error
  return nil if !aborted?

  first_aborted_child = processes.select(&:aborted?).sort_by(&:finalized_at).first

  first_aborted_child ? first_aborted_child.real_error : error.message
end

#resultObject



98
99
100
# File 'lib/asynchronic/process.rb', line 98

def result
  data_store.lazy[:result]
end

#rootObject



122
123
124
# File 'lib/asynchronic/process.rb', line 122

def root
  id.nested? ? Process.new(environment, id.sections.first) : self
end

#set(key, value) ⇒ Object



176
177
178
# File 'lib/asynchronic/process.rb', line 176

def set(key, value)
  self.data = self.data.merge key => value
end

#wakeupObject



156
157
158
159
160
161
162
163
164
165
166
# File 'lib/asynchronic/process.rb', line 156

def wakeup
  Asynchronic.logger.info('Asynchronic') { "Wakeup started #{type} (#{id})" }
  if environment.queue_engine.asynchronic?
    data_store.synchronize(id) { wakeup_children }
  else
    wakeup_children
  end
  Asynchronic.logger.info('Asynchronic') { "Wakeup finalized #{type} (#{id})" }

  parent.wakeup if parent && finalized?
end