Class: CapistranoMulticonfigParallel::CelluloidManager

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Logger, Celluloid::Notifications
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb

Overview

rubocop:disable ClassLength

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_manager) ⇒ CelluloidManager

Returns a new instance of CelluloidManager.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 17

def initialize(job_manager)
  # start SupervisionGroup
  @worker_supervisor = Celluloid::SupervisionGroup.run!
  @job_manager = job_manager
  @registration_complete = false
  # Get a handle on the SupervisionGroup::Member
  @actor_system = Celluloid.boot
  @mutex = Mutex.new
  # http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member
  @workers = @worker_supervisor.pool(CapistranoMulticonfigParallel::CelluloidWorker, as: :workers, size: 10)
  Actor.current.link @workers
  # Get a handle on the PoolManager
  # http://rubydoc.info/gems/celluloid/Celluloid/PoolManager
  # @workers = workers_pool.actor
  @conditions = []
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
  @job_to_condition = {}

  @worker_supervisor.supervise_as(:terminal_server, CapistranoMulticonfigParallel::TerminalTable, Actor.current)
  @worker_supervisor.supervise_as(:web_server, CelluloidPubsub::WebServer, self.class.websocket_config.merge(enable_debug: self.class.debug_websocket?))
end

Instance Attribute Details

#actor_systemObject

Returns the value of attribute actor_system.



12
13
14
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12

def actor_system
  @actor_system
end

#job_to_conditionObject

Returns the value of attribute job_to_condition.



12
13
14
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12

def job_to_condition
  @job_to_condition
end

#job_to_workerObject

Returns the value of attribute job_to_worker.



12
13
14
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12

def job_to_worker
  @job_to_worker
end

#jobsObject

Returns the value of attribute jobs.



12
13
14
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12

def jobs
  @jobs
end

#mutexObject

Returns the value of attribute mutex.



12
13
14
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12

def mutex
  @mutex
end

#registration_completeObject

Returns the value of attribute registration_complete.



12
13
14
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12

def registration_complete
  @registration_complete
end

#worker_supervisorObject (readonly)

Returns the value of attribute worker_supervisor.



14
15
16
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14

def worker_supervisor
  @worker_supervisor
end

#worker_to_jobObject

Returns the value of attribute worker_to_job.



12
13
14
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12

def worker_to_job
  @worker_to_job
end

#workersObject (readonly)

Returns the value of attribute workers.



14
15
16
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14

def workers
  @workers
end

Class Method Details

.debug_enabled?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 41

def self.debug_enabled?
  debug_enabled
end

.debug_websocket?Boolean

Returns:

  • (Boolean)


45
46
47
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 45

def self.debug_websocket?
  websocket_config['enable_debug'].to_s == 'true'
end

.websocket_configObject



49
50
51
52
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 49

def self.websocket_config
  config = CapistranoMulticonfigParallel.configuration[:websocket_server]
  config.present? && config.is_a?(Hash) ? config.stringify_keys : {}
end

Instance Method Details

#apply_confirmation_for_worker(worker) ⇒ Object



123
124
125
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 123

def apply_confirmation_for_worker(worker)
  worker.alive? && CapistranoMulticonfigParallel.configuration.apply_stage_confirmation.include?(worker.env_name)
end

#apply_confirmations?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 111

def apply_confirmations?
  CapistranoMulticonfigParallel.configuration.task_confirmation_active.to_s.downcase == 'true'
end

#can_tag_staging?Boolean

Returns:

  • (Boolean)


213
214
215
216
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 213

def can_tag_staging?
  @job_manager.can_tag_staging? &&
    @jobs.find { |_job_id, job| job['env'] == 'production' }.blank?
end

#confirm_task_approval(result, task, worker = nil) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 186

def confirm_task_approval(result, task, worker = nil)
  return unless result.present?
  print_confirm_task_approvall(result, task, worker = nil)
  return if fetch(:apps_symlink_confirmation).blank? || fetch(:apps_symlink_confirmation).downcase != 'y'
  @jobs.pmap do |job_id, job|
    worker = get_worker_for_job(job_id)
    worker.publish_rake_event('approved' => 'yes',
                              'action' => 'invoke',
                              'job_id' => job['id'],
                              'task' => task
                             )
  end
end

#delegate(job) ⇒ Object

call to send an actor a job



64
65
66
67
68
69
70
71
72
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 64

def delegate(job)
  job = job.stringify_keys
  job['id'] = generate_job_id(job) unless job_failed?(job)
  @jobs[job['id']] = job
  job['env_options'][CapistranoMulticonfigParallel::ENV_KEY_JOB_ID] = job['id']
  # debug(@jobs)
  # start work and send it to the background
  @workers.async.work(job, Actor.current)
end

#dispatch_new_job(job) ⇒ Object



218
219
220
221
222
223
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 218

def dispatch_new_job(job)
  original_env = job['env_options']
  env_opts = @job_manager.get_app_additional_env_options(job['app_name'], job['stage'])
  job['env_options'] = original_env.merge(env_opts)
  async.delegate(job)
end

#generate_job_id(job) ⇒ Object



54
55
56
57
58
59
60
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 54

def generate_job_id(job)
  primary_key = @jobs.size + 1
  job['id'] = primary_key
  @jobs[primary_key] = job
  @jobs[primary_key]
  job['id']
end

#get_job_status(job) ⇒ Object

lookup status of job by asking actor running it



241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 241

def get_job_status(job)
  status = nil
  if job.present?
    if job.is_a?(Hash)
      job = job.stringify_keys
      actor = @registered_jobs[job['id']]
      status = actor.status
    else
      actor = @registered_jobs[job.to_i]
      status = actor.status
    end
  end
  status
end

#get_worker_for_job(job) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 200

def get_worker_for_job(job)
  if job.present?
    if job.is_a?(Hash)
      job = job.stringify_keys
      @job_to_worker[job['id']]
    else
      @job_to_worker[job.to_i]
    end
  else
    return nil
  end
end

#job_failed?(job) ⇒ Boolean

Returns:

  • (Boolean)


256
257
258
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 256

def job_failed?(job)
  job['worker_action'].present? && job['worker_action'] == 'worker_died'
end

#mark_completed_remaining_tasks(worker) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 136

def mark_completed_remaining_tasks(worker)
  return if !apply_confirmation_for_worker(worker) || !apply_confirmations?
  CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, _index|
    fake_result = proc { |sum| sum }
    task_confirmation = @job_to_condition[worker.job_id][task]
    if task_confirmation[:status] != 'confirmed'
      task_confirmation[:status] = 'confirmed'
      task_confirmation[:condition].signal(fake_result)
    end
  end
end


175
176
177
178
179
180
181
182
183
184
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 175

def print_confirm_task_approvall(result, task, worker = nil)
  return if result.is_a?(Proc)
  message = "Do you want  to continue the deployment and execute #{task.upcase}"
  message += " for JOB #{worker.job_id}" if worker.present?
  message += '?'
  set :apps_symlink_confirmation, CapistranoMulticonfigParallel.ask_confirm(message, 'Y/N')
  until fetch(:apps_symlink_confirmation).present?
    sleep(0.1) # keep current thread alive
  end
end

#process_job(job) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 225

def process_job(job)
  env_options = {}
  job['env_options'].each do |key, value|
    env_options[key] = value if value.present?
  end
  {
    'job_id' => job['id'],
    'app_name' => job['app'],
    'env_name' => job['env'],
    'action_name' => job['action'],
    'env_options' => env_options,
    'task_arguments' => job['task_arguments']
  }
end

#process_jobsObject



96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 96

def process_jobs
  if syncronized_confirmation?
    @job_to_worker.pmap do |_job_id, worker|
      worker.async.start_task
    end
    wait_task_confirmations
  end
  condition = @job_to_worker.all? { |_job_id, worker| worker.alive? && worker.worker_state == 'finished' }
  until condition == true
    sleep(0.1) # keep current thread alive
  end
  debug("all jobs have completed #{condition}") if self.class.debug_enabled?
  @job_manager.condition.signal('completed') if condition
end

#register_worker_for_job(job, worker) ⇒ Object

call back from actor once it has received it’s job actor should do this asap



76
77
78
79
80
81
82
83
84
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 76

def register_worker_for_job(job, worker)
  job = job.stringify_keys
  if job['id'].blank?
    debug("job id not found. delegating again the job #{job.inspect}") if self.class.debug_enabled?
    delegate(job)
  else
    start_worker(job, worker)
  end
end

#setup_worker_conditions(worker) ⇒ Object



127
128
129
130
131
132
133
134
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 127

def setup_worker_conditions(worker)
  return if !apply_confirmation_for_worker(worker) || !apply_confirmations?
  hash_conditions = {}
  CapistranoMulticonfigParallel.configuration.task_confirmations.each do |task|
    hash_conditions[task] = { condition: Celluloid::Condition.new, status: 'unconfirmed' }
  end
  @job_to_condition[worker.job_id] = hash_conditions
end

#start_worker(job, worker) ⇒ Object



86
87
88
89
90
91
92
93
94
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 86

def start_worker(job, worker)
  worker.job_id = job['id'] if worker.job_id.blank?
  @job_to_worker[job['id']] = worker
  @worker_to_job[worker.mailbox.address] = job
  debug("worker #{worker.job_id} registed into manager") if self.class.debug_enabled?
  Actor.current.link worker
  worker.async.start_task unless syncronized_confirmation?
  @registration_complete = true if @job_manager.jobs.size == @job_to_worker.size
end

#syncronization_required?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 114

def syncronization_required?
  CapistranoMulticonfigParallel.configuration.syncronize_confirmation.to_s.downcase == 'true'
end

#syncronized_confirmation?Boolean

Returns:

  • (Boolean)


118
119
120
121
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 118

def syncronized_confirmation?
  ( syncronization_required? && !@job_manager.executes_deploy_stages?) ||
    (syncronization_required? && @job_manager.executes_deploy_stages? && !@job_manager.can_tag_staging? && @job_manager.confirmation_applies_to_all_workers?)
end

#wait_condition_for_task(job_id, task) ⇒ Object



156
157
158
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 156

def wait_condition_for_task(job_id, task)
  @job_to_condition[job_id][task][:condition].wait
end

#wait_task_confirmationsObject



160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 160

def wait_task_confirmations
  stage_apply = CapistranoMulticonfigParallel.configuration.apply_stage_confirmation.include?(@job_manager.stage)
  return if !apply_confirmations? || !stage_apply || !syncronized_confirmation?
  CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, _index|
    results = []
    @jobs.pmap do |job_id, _job|
      result = wait_condition_for_task(job_id, task)
      results << result
    end
    if results.size == @jobs.size
      confirm_task_approval(results, task)
    end
  end
end

#wait_task_confirmations_worker(worker) ⇒ Object



148
149
150
151
152
153
154
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 148

def wait_task_confirmations_worker(worker)
  return if !apply_confirmations? || !apply_confirmation_for_worker(worker) || syncronized_confirmation?
  CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, _index|
    result = wait_condition_for_task(worker.job_id, task)
    confirm_task_approval(result, task, worker) if result.present?
  end
end

#worker_died(worker, reason) ⇒ Object



260
261
262
263
264
265
266
267
268
269
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 260

def worker_died(worker, reason)
  debug("worker with mailbox #{worker.mailbox.inspect} died  for reason:  #{reason}") if self.class.debug_enabled?
  job = @worker_to_job[worker.mailbox.address]
  @worker_to_job.delete(worker.mailbox.address)
  debug "restarting #{job} on new worker" if self.class.debug_enabled?
  return if job.blank? || job_failed?(job)
  return unless job['action_name'] == 'deploy'
  job = job.merge(:action => 'deploy:rollback', 'worker_action' => 'worker_died')
  delegate(job)
end