Class: CapistranoMulticonfigParallel::CelluloidManager

Inherits:
Object
  • Object
show all
Includes:
BaseActorHelper
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb

Overview

manager class that handles workers

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseActorHelper

included

Constructor Details

#initialize(job_manager) ⇒ CelluloidManager

Returns a new instance of CelluloidManager.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14

def initialize(job_manager)
  @job_manager = job_manager
  @registration_complete = false
  return if configuration.multi_secvential.to_s.downcase == 'true'
  # start SupervisionGroup
  @worker_supervisor = setup_supervision_group

  # Get a handle on the SupervisionGroup::Member
  @mutex = Mutex.new
  # http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member
  @workers = setup_pool_of_actor(@worker_supervisor, actor_name: :workers, type: CapistranoMulticonfigParallel::CelluloidWorker, size: 10)
  #@workers = Celluloid::Actor[:workers].pool
  Actor.current.link @workers
  setup_actor_supervision(@worker_supervisor, actor_name: :terminal_server, type: CapistranoMulticonfigParallel::TerminalTable, args: [Actor.current, @job_manager, configuration.fetch(:terminal, {})])
  setup_actor_supervision(@worker_supervisor, actor_name: :web_server, type: CapistranoMulticonfigParallel::WebServer, args: websocket_config)
  # Get a handle on the PoolManager
  # http://rubydoc.info/gems/celluloid/Celluloid/PoolManager
  # @workers = workers_pool.actor

  @stderr_buffer = StringIO.new
  @conditions = []
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
  @job_to_condition = {}
end

Instance Attribute Details

#job_to_conditionObject

Returns the value of attribute job_to_condition.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def job_to_condition
  @job_to_condition
end

#job_to_workerObject

Returns the value of attribute job_to_worker.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def job_to_worker
  @job_to_worker
end

#jobsObject

Returns the value of attribute jobs.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def jobs
  @jobs
end

#mutexObject

Returns the value of attribute mutex.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def mutex
  @mutex
end

#registration_completeObject

Returns the value of attribute registration_complete.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def registration_complete
  @registration_complete
end

#stderr_bufferObject

Returns the value of attribute stderr_buffer.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def stderr_buffer
  @stderr_buffer
end

#worker_supervisorObject (readonly)

Returns the value of attribute worker_supervisor.



11
12
13
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11

def worker_supervisor
  @worker_supervisor
end

#worker_to_jobObject

Returns the value of attribute worker_to_job.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def worker_to_job
  @worker_to_job
end

#workersObject (readonly)

Returns the value of attribute workers.



11
12
13
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11

def workers
  @workers
end

#workers_terminatedObject

Returns the value of attribute workers_terminated.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9

def workers_terminated
  @workers_terminated
end

Instance Method Details

#all_workers_finished?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 62

def all_workers_finished?
  @jobs.all? { |_job_id, job| job.work_done?   }
end

#apply_confirmation_for_job(job) ⇒ Object



108
109
110
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 108

def apply_confirmation_for_job(job)
  configuration.apply_stage_confirmation.include?(job.stage) && apply_confirmations?
end

#apply_confirmations?Boolean

Returns:

  • (Boolean)


99
100
101
102
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 99

def apply_confirmations?
  confirmations = configuration.task_confirmations
  confirmations.is_a?(Array) && confirmations.present?
end

#can_tag_staging?Boolean

Returns:

  • (Boolean)


204
205
206
207
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 204

def can_tag_staging?
  @job_manager.can_tag_staging? && @job_manager.tag_staging_exists? &&
  @jobs.find { |_job_id, job| job.stage == 'production' }.blank?
end

#check_workers_done?Boolean

Returns:

  • (Boolean)


84
85
86
87
88
89
90
91
92
93
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 84

def check_workers_done?
  Thread.new do
    loop do
      if Actor.current.alive? && all_workers_finished?
        @workers_terminated.signal('completed')
        break
      end
    end
  end
end

#confirm_task_approval(result, task, processed_job = nil) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 176

def confirm_task_approval(result, task, processed_job = nil)
  return unless result.present?
  result = print_confirm_task_approvall(result, task, processed_job)
  return unless action_confirmed?(result)
  @jobs.pmap do |job_id, job|
    worker = get_worker_for_job(job_id)
    if worker.alive?
      worker.publish_rake_event('approved' => 'yes',
      'action' => 'invoke',
      'job_id' => job.id,
      'task' => task
      )
    end
  end
end

#delegate_job(job, old_job = "") ⇒ Object

call to send an actor a job



43
44
45
46
47
48
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 43

def delegate_job(job, old_job = "")
  @jobs[job.id] = job
  # debug(@jobs)
  # start work and send it to the background
  @workers.work(job, Actor.current, old_job)
end

#dispatch_new_job(job, options = {}) ⇒ Object



209
210
211
212
213
214
215
216
217
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 209

def dispatch_new_job(job, options = {})
  return unless job.is_a?(CapistranoMulticonfigParallel::Job)
  options.stringify_keys! if options.present?
  env_opts = options['skip_env_options'].present? ? {} : @job_manager.get_app_additional_env_options(job.app, job.stage)
  new_job_options = job.options.except!('id', 'status', 'exit_status').merge('env_options' => job.env_options.merge(env_opts))
  new_job = CapistranoMulticonfigParallel::Job.new(@job_manager, new_job_options.merge(options))
  log_to_file("Trying to DiSPATCH new JOB #{new_job.inspect}")
  async.delegate_job(new_job, job) unless job.rolling_back?
end

#get_job_status(job) ⇒ Object

lookup status of job by asking actor running it



220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 220

def get_job_status(job)
  status = nil
  if job.present?
    if job.is_a?(CapistranoMulticonfigParallel::Job)
      actor = @job_to_worker[job.id]
      status = actor.job_status
    else
      actor = @job_to_worker[job]
      status = actor.job_status
    end
  end
  status
end

#get_worker_for_job(job) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 192

def get_worker_for_job(job)
  if job.present?
    if job.is_a?(CapistranoMulticonfigParallel::Job)
      @job_to_worker[job.id]
    else
      @job_to_worker[job]
    end
  else
    return nil
  end
end

#mark_completed_remaining_tasks(job) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 121

def mark_completed_remaining_tasks(job)
  return unless apply_confirmation_for_job(job)
  configuration.task_confirmations.each_with_index do |task, _index|
    fake_result = proc { |sum| sum }
    task_confirmation = @job_to_condition[job.id][task]
    next unless task_confirmation[:status] != 'confirmed'
    log_to_file("worker #{job.id} with action #{job.action} status #{job.status} and exit status #{job.exit_status} tries to mark fake the task #{task} with status #{task_confirmation[:status]}")
    task_confirmation[:status] = 'confirmed'
    task_confirmation[:condition].signal(fake_result)
  end
end


160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 160

def print_confirm_task_approvall(result, task, job)
  return if result.is_a?(Proc)
  message = "Do you want  to continue the deployment and execute #{task.upcase}"
  message += " for JOB #{job.id}" if job.present?
  message += '?'
  if Celluloid::Actor[:terminal_server].present? && Celluloid::Actor[:terminal_server].alive?
    apps_symlink_confirmation = Celluloid::Actor[:terminal_server].show_confirmation(message, 'Y/N')
    until apps_symlink_confirmation.present?
      sleep(0.1) # keep current thread alive
    end
    apps_symlink_confirmation
  else
    'y'
  end
end

#process_jobsObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 66

def process_jobs
  @workers_terminated = Celluloid::Condition.new
  if syncronized_confirmation?
    @job_to_worker.pmap do |_job_id, worker|
      worker.async.start_task
    end
    wait_task_confirmations
  end
  terminal_show
  async.check_workers_done?
  condition = @workers_terminated.wait
  until condition.present?
    sleep(0.1) # keep current thread alive
  end
  log_to_file("all jobs have completed #{condition}")
  terminal_show
end

#register_worker_for_job(job, worker) ⇒ Object

call back from actor once it has received it’s job actor should do this asap



52
53
54
55
56
57
58
59
60
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 52

def register_worker_for_job(job, worker)
  @job_to_worker[job.id] = worker
  @worker_to_job[worker.mailbox.address] = job
  log_to_file("worker #{worker.job_id} registed into manager")
  Actor.current.link worker
  worker.async.start_task if !syncronized_confirmation? || job.rolling_back?
  return unless syncronized_confirmation?
  @registration_complete = true if @job_manager.jobs.size == @jobs.size
end

#setup_worker_conditions(job) ⇒ Object



112
113
114
115
116
117
118
119
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 112

def setup_worker_conditions(job)
  return unless apply_confirmation_for_job(job)
  hash_conditions = {}
  configuration.task_confirmations.each do |task|
    hash_conditions[task] = { condition: Celluloid::Condition.new, status: 'unconfirmed' }
  end
  @job_to_condition[job.id] = hash_conditions
end

#syncronized_confirmation?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 104

def syncronized_confirmation?
  !can_tag_staging?
end

#terminal_showObject



95
96
97
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 95

def terminal_show
  Celluloid::Actor[:terminal_server].async.notify_time_change(CapistranoMulticonfigParallel::TerminalTable.topic, type: 'output') if Celluloid::Actor[:terminal_server].alive?
end

#wait_condition_for_task(job_id, task) ⇒ Object



141
142
143
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 141

def wait_condition_for_task(job_id, task)
  @job_to_condition[job_id][task][:condition].wait
end

#wait_task_confirmationsObject



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 145

def wait_task_confirmations
  stage_apply = configuration.apply_stage_confirmation.include?(@job_manager.stage)
  return if !stage_apply || !syncronized_confirmation?
  configuration.task_confirmations.each_with_index do |task, _index|
    results = []
    @jobs.pmap do |job_id, _job|
      result = wait_condition_for_task(job_id, task)
      results << result
    end
    if results.size == @jobs.size && !all_workers_finished?
      confirm_task_approval(results, task)
    end
  end
end

#wait_task_confirmations_worker(job) ⇒ Object



133
134
135
136
137
138
139
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 133

def wait_task_confirmations_worker(job)
  return if !apply_confirmation_for_job(job) || !syncronized_confirmation?
  configuration.task_confirmations.each_with_index do |task, _index|
    result = wait_condition_for_task(job.id, task)
    confirm_task_approval(result, task, job)
  end
end

#worker_died(worker, reason) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 234

def worker_died(worker, reason)
  job = @worker_to_job[worker.mailbox.address]
  mailbox = worker.mailbox
  log_to_file("worker_died: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died  for reason:  #{reason}")
  return true if job.blank? || job.rolling_back? || job.action != 'deploy'
  #job.rollback_changes_to_application
  @worker_to_job.delete(mailbox.address)
  log_to_file("RESTARTING: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died  for reason:  #{reason}")

  dispatch_new_job(job, skip_env_options: true, action: 'deploy:rollback')
end