Class: CapistranoMulticonfigParallel::CelluloidManager

Inherits:
Object
  • Object
show all
Includes:
BaseActorHelper
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb

Overview

manager class that handles workers

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseActorHelper

included

Constructor Details

#initialize(job_manager) ⇒ CelluloidManager

Returns a new instance of CelluloidManager.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14

def initialize(job_manager)
  @job_manager = job_manager
  @registration_complete = false
  return if configuration.multi_secvential.to_s.downcase == 'true'
  # start SupervisionGroup
  @worker_supervisor = setup_supervision_group

  # Get a handle on the SupervisionGroup::Member
  @mutex = Mutex.new
  # http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member
  @workers = setup_pool_of_actor(@worker_supervisor, actor_name: :workers, type: CapistranoMulticonfigParallel::CelluloidWorker, size: 10)
  #@workers = Celluloid::Actor[:workers].pool
  Actor.current.link @workers
  setup_actor_supervision(@worker_supervisor, actor_name: :terminal_server, type: CapistranoMulticonfigParallel::TerminalTable, args: [Actor.current, @job_manager, configuration.fetch(:terminal, {})])
  setup_actor_supervision(@worker_supervisor, actor_name: :web_server, type: CapistranoMulticonfigParallel::WebServer, args: websocket_config)
  # Get a handle on the PoolManager
  # http://rubydoc.info/gems/celluloid/Celluloid/PoolManager
  # @workers = workers_pool.actor

  @stderr_buffer = StringIO.new
  @conditions = []
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
  @job_to_condition = {}
end

Instance Attribute Details

#bundler_workersObject (readonly)

Returns the value of attribute bundler_workers.



11
12
13
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11

def bundler_workers
  @bundler_workers
end

#job_to_conditionObject

Returns the value of attribute job_to_condition.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def job_to_condition
  @job_to_condition
end

#job_to_workerObject

Returns the value of attribute job_to_worker.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def job_to_worker
  @job_to_worker
end

#jobsObject

Returns the value of attribute jobs.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def jobs
  @jobs
end

#mutexObject

Returns the value of attribute mutex.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def mutex
  @mutex
end

#registration_completeObject

Returns the value of attribute registration_complete.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def registration_complete
  @registration_complete
end

#stderr_bufferObject

Returns the value of attribute stderr_buffer.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def stderr_buffer
  @stderr_buffer
end

#worker_supervisorObject (readonly)

Returns the value of attribute worker_supervisor.



11
12
13
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11

def worker_supervisor
  @worker_supervisor
end

#worker_to_jobObject

Returns the value of attribute worker_to_job.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def worker_to_job
  @worker_to_job
end

#workersObject (readonly)

Returns the value of attribute workers.



11
12
13
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11

def workers
  @workers
end

#workers_terminatedObject

Returns the value of attribute workers_terminated.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def workers_terminated
  @workers_terminated
end

Instance Method Details

#all_workers_finished?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 69

def all_workers_finished?
  @jobs.all? { |_job_id, job| job.work_done?   }
end

#apply_confirmation_for_job(job) ⇒ Object



115
116
117
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 115

def apply_confirmation_for_job(job)
  configuration.apply_stage_confirmation.include?(job.stage) && apply_confirmations?
end

#apply_confirmations?Boolean

Returns:

  • (Boolean)


106
107
108
109
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 106

def apply_confirmations?
  confirmations = configuration.task_confirmations
  confirmations.is_a?(Array) && confirmations.present?
end

#can_tag_staging?Boolean

Returns:

  • (Boolean)


211
212
213
214
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 211

def can_tag_staging?
  @job_manager.can_tag_staging? && @job_manager.tag_staging_exists? &&
  @jobs.find { |_job_id, job| job.stage == 'production' }.blank?
end

#check_workers_done?Boolean

Returns:

  • (Boolean)


91
92
93
94
95
96
97
98
99
100
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 91

def check_workers_done?
  Thread.new do
    loop do
      if Actor.current.alive? && all_workers_finished?
        @workers_terminated.signal('completed')
        break
      end
    end
  end
end

#confirm_task_approval(result, task, processed_job = nil) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 183

def confirm_task_approval(result, task, processed_job = nil)
  return unless result.present?
  result = print_confirm_task_approvall(result, task, processed_job)
  return unless action_confirmed?(result)
  @jobs.pmap do |job_id, job|
    worker = get_worker_for_job(job_id)
    if worker.alive?
      worker.publish_rake_event('approved' => 'yes',
      'action' => 'invoke',
      'job_id' => job.id,
      'task' => task
      )
    end
  end
end

#delegate_job(job, old_job = "") ⇒ Object

call to send an actor a job



50
51
52
53
54
55
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 50

def delegate_job(job, old_job = "")
  @jobs[job.id] = job
  # debug(@jobs)
  # start work and send it to the background
  @workers.work(job, Actor.current, old_job)
end

#dispatch_new_job(job, options = {}) ⇒ Object



216
217
218
219
220
221
222
223
224
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 216

def dispatch_new_job(job, options = {})
  return unless job.is_a?(CapistranoMulticonfigParallel::Job)
  options.stringify_keys! if options.present?
  env_opts = options['skip_env_options'].present? ? {} : @job_manager.get_app_additional_env_options(job.app, job.stage)
  new_job_options = job.options.except!('id', 'status', 'exit_status').merge('env_options' => job.env_options.merge(env_opts))
  new_job = CapistranoMulticonfigParallel::Job.new(@job_manager, new_job_options.merge(options))
  log_to_file("Trying to DiSPATCH new JOB #{new_job.inspect}")
  async.delegate_job(new_job, job) unless job.rolling_back?
end

#get_job_status(job) ⇒ Object

lookup status of job by asking actor running it



227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 227

def get_job_status(job)
  status = nil
  if job.present?
    if job.is_a?(CapistranoMulticonfigParallel::Job)
      actor = @job_to_worker[job.id]
      status = actor.job_status
    else
      actor = @job_to_worker[job]
      status = actor.job_status
    end
  end
  status
end

#get_worker_for_job(job) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 199

def get_worker_for_job(job)
  if job.present?
    if job.is_a?(CapistranoMulticonfigParallel::Job)
      @job_to_worker[job.id]
    else
      @job_to_worker[job]
    end
  else
    return nil
  end
end

#mark_completed_remaining_tasks(job) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 128

def mark_completed_remaining_tasks(job)
  return unless apply_confirmation_for_job(job)
  configuration.task_confirmations.each_with_index do |task, _index|
    fake_result = proc { |sum| sum }
    task_confirmation = @job_to_condition[job.id][task]
    next unless task_confirmation[:status] != 'confirmed'
    log_to_file("worker #{job.id} with action #{job.action} status #{job.status} and exit status #{job.exit_status} tries to mark fake the task #{task} with status #{task_confirmation[:status]}")
    task_confirmation[:status] = 'confirmed'
    task_confirmation[:condition].signal(fake_result)
  end
end


167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 167

def print_confirm_task_approvall(result, task, job)
  return if result.is_a?(Proc)
  message = "Do you want  to continue the deployment and execute #{task.upcase}"
  message += " for JOB #{job.id}" if job.present?
  message += '?'
  if Celluloid::Actor[:terminal_server].present? && Celluloid::Actor[:terminal_server].alive?
    apps_symlink_confirmation = Celluloid::Actor[:terminal_server].show_confirmation(message, 'Y/N')
    until apps_symlink_confirmation.present?
      sleep(0.1) # keep current thread alive
    end
    apps_symlink_confirmation
  else
    'y'
  end
end

#process_jobsObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 73

def process_jobs
  @workers_terminated = Celluloid::Condition.new
  if syncronized_confirmation?
    @job_to_worker.pmap do |_job_id, worker|
      worker.async.start_task
    end
    wait_task_confirmations
  end
  terminal_show
  async.check_workers_done?
  condition = @workers_terminated.wait
  until condition.present?
    sleep(0.1) # keep current thread alive
  end
  log_to_file("all jobs have completed #{condition}")
  terminal_show
end

#register_worker_for_job(job, worker) ⇒ Object

call back from actor once it has received it’s job actor should do this asap



59
60
61
62
63
64
65
66
67
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 59

def register_worker_for_job(job, worker)
  @job_to_worker[job.id] = worker
  @worker_to_job[worker.mailbox.address] = job
  log_to_file("worker #{worker.job_id} registed into manager")
  Actor.current.link worker
  worker.async.start_task if !syncronized_confirmation? || job.rolling_back?
  return unless syncronized_confirmation?
  @registration_complete = true if @job_manager.jobs.size == @jobs.size
end

#setup_worker_conditions(job) ⇒ Object



119
120
121
122
123
124
125
126
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 119

def setup_worker_conditions(job)
  return unless apply_confirmation_for_job(job)
  hash_conditions = {}
  configuration.task_confirmations.each do |task|
    hash_conditions[task] = { condition: Celluloid::Condition.new, status: 'unconfirmed' }
  end
  @job_to_condition[job.id] = hash_conditions
end

#start_bundler_supervision_if_neededObject



41
42
43
44
45
46
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 41

def start_bundler_supervision_if_needed
  return if configuration.check_app_bundler_dependencies.to_s.downcase != 'true'
  @bundler_workers = setup_pool_of_actor(@worker_supervisor, actor_name: :bundler_workers, type: CapistranoMulticonfigParallel::BundlerWorker, size: 10)
  Actor.current.link @bundler_workers
  setup_actor_supervision(@worker_supervisor, actor_name: :bundler_terminal_server, type: CapistranoMulticonfigParallel::BundlerTerminalTable, args: [Actor.current, @job_manager, configuration.fetch(:terminal, {})])
end

#syncronized_confirmation?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 111

def syncronized_confirmation?
  !can_tag_staging?
end

#terminal_showObject



102
103
104
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 102

def terminal_show
  Celluloid::Actor[:terminal_server].async.notify_time_change(CapistranoMulticonfigParallel::TerminalTable.topic, type: 'output') if Celluloid::Actor[:terminal_server].alive?
end

#wait_condition_for_task(job_id, task) ⇒ Object



148
149
150
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 148

def wait_condition_for_task(job_id, task)
  @job_to_condition[job_id][task][:condition].wait
end

#wait_task_confirmationsObject



152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 152

def wait_task_confirmations
  stage_apply = configuration.apply_stage_confirmation.include?(@job_manager.stage)
  return if !stage_apply || !syncronized_confirmation?
  configuration.task_confirmations.each_with_index do |task, _index|
    results = []
    @jobs.pmap do |job_id, _job|
      result = wait_condition_for_task(job_id, task)
      results << result
    end
    if results.size == @jobs.size && !all_workers_finished?
      confirm_task_approval(results, task)
    end
  end
end

#wait_task_confirmations_worker(job) ⇒ Object



140
141
142
143
144
145
146
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 140

def wait_task_confirmations_worker(job)
  return if !apply_confirmation_for_job(job) || !syncronized_confirmation?
  configuration.task_confirmations.each_with_index do |task, _index|
    result = wait_condition_for_task(job.id, task)
    confirm_task_approval(result, task, job)
  end
end

#worker_died(worker, reason) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 241

def worker_died(worker, reason)
  job = @worker_to_job[worker.mailbox.address]
  mailbox = worker.mailbox
  log_to_file("worker_died: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died  for reason:  #{reason}")
  return true if job.blank? || job.rolling_back? || job.action != 'deploy'
  #job.rollback_changes_to_application
  @worker_to_job.delete(mailbox.address)
  log_to_file("RESTARTING: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died  for reason:  #{reason}")

  dispatch_new_job(job, skip_env_options: true, action: 'deploy:rollback')
end