Class: CapistranoMulticonfigParallel::CelluloidManager
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidManager
- Includes:
- BaseActorHelper
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb
Overview
manager class that handles workers
Instance Attribute Summary collapse
-
#bundler_workers ⇒ Object
readonly
Returns the value of attribute bundler_workers.
-
#job_to_condition ⇒ Object
Returns the value of attribute job_to_condition.
-
#job_to_worker ⇒ Object
Returns the value of attribute job_to_worker.
-
#jobs ⇒ Object
Returns the value of attribute jobs.
-
#mutex ⇒ Object
Returns the value of attribute mutex.
-
#registration_complete ⇒ Object
Returns the value of attribute registration_complete.
-
#stderr_buffer ⇒ Object
Returns the value of attribute stderr_buffer.
-
#worker_supervisor ⇒ Object
readonly
Returns the value of attribute worker_supervisor.
-
#worker_to_job ⇒ Object
Returns the value of attribute worker_to_job.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
-
#workers_terminated ⇒ Object
Returns the value of attribute workers_terminated.
Instance Method Summary collapse
- #all_workers_finished? ⇒ Boolean
- #apply_confirmation_for_job(job) ⇒ Object
- #apply_confirmations? ⇒ Boolean
- #can_tag_staging? ⇒ Boolean
- #check_workers_done? ⇒ Boolean
- #confirm_task_approval(result, task, processed_job = nil) ⇒ Object
-
#delegate_job(job, old_job = "") ⇒ Object
call to send an actor a job.
- #dispatch_new_job(job, options = {}) ⇒ Object
-
#get_job_status(job) ⇒ Object
lookup status of job by asking actor running it.
- #get_worker_for_job(job) ⇒ Object
-
#initialize(job_manager) ⇒ CelluloidManager
constructor
A new instance of CelluloidManager.
- #mark_completed_remaining_tasks(job) ⇒ Object
- #print_confirm_task_approvall(result, task, job) ⇒ Object
- #process_jobs ⇒ Object
-
#register_worker_for_job(job, worker) ⇒ Object
call back from actor once it has received it’s job actor should do this asap.
- #setup_worker_conditions(job) ⇒ Object
- #start_bundler_supervision_if_needed ⇒ Object
- #syncronized_confirmation? ⇒ Boolean
- #terminal_show ⇒ Object
- #wait_condition_for_task(job_id, task) ⇒ Object
- #wait_task_confirmations ⇒ Object
- #wait_task_confirmations_worker(job) ⇒ Object
- #worker_died(worker, reason) ⇒ Object
Methods included from BaseActorHelper
Constructor Details
#initialize(job_manager) ⇒ CelluloidManager
Returns a new instance of CelluloidManager.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14 def initialize(job_manager) @job_manager = job_manager @registration_complete = false return if configuration.multi_secvential.to_s.downcase == 'true' # start SupervisionGroup @worker_supervisor = setup_supervision_group # Get a handle on the SupervisionGroup::Member @mutex = Mutex.new # http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member @workers = setup_pool_of_actor(@worker_supervisor, actor_name: :workers, type: CapistranoMulticonfigParallel::CelluloidWorker, size: 10) #@workers = Celluloid::Actor[:workers].pool Actor.current.link @workers setup_actor_supervision(@worker_supervisor, actor_name: :terminal_server, type: CapistranoMulticonfigParallel::TerminalTable, args: [Actor.current, @job_manager, configuration.fetch(:terminal, {})]) setup_actor_supervision(@worker_supervisor, actor_name: :web_server, type: CapistranoMulticonfigParallel::WebServer, args: websocket_config) # Get a handle on the PoolManager # http://rubydoc.info/gems/celluloid/Celluloid/PoolManager # @workers = workers_pool.actor @stderr_buffer = StringIO.new @conditions = [] @jobs = {} @job_to_worker = {} @worker_to_job = {} @job_to_condition = {} end |
Instance Attribute Details
#bundler_workers ⇒ Object (readonly)
Returns the value of attribute bundler_workers.
11 12 13 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11 def bundler_workers @bundler_workers end |
#job_to_condition ⇒ Object
Returns the value of attribute job_to_condition.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def job_to_condition @job_to_condition end |
#job_to_worker ⇒ Object
Returns the value of attribute job_to_worker.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def job_to_worker @job_to_worker end |
#jobs ⇒ Object
Returns the value of attribute jobs.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def jobs @jobs end |
#mutex ⇒ Object
Returns the value of attribute mutex.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def mutex @mutex end |
#registration_complete ⇒ Object
Returns the value of attribute registration_complete.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def registration_complete @registration_complete end |
#stderr_buffer ⇒ Object
Returns the value of attribute stderr_buffer.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def stderr_buffer @stderr_buffer end |
#worker_supervisor ⇒ Object (readonly)
Returns the value of attribute worker_supervisor.
11 12 13 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11 def worker_supervisor @worker_supervisor end |
#worker_to_job ⇒ Object
Returns the value of attribute worker_to_job.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def worker_to_job @worker_to_job end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
11 12 13 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 11 def workers @workers end |
#workers_terminated ⇒ Object
Returns the value of attribute workers_terminated.
9 10 11 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 9 def workers_terminated @workers_terminated end |
Instance Method Details
#all_workers_finished? ⇒ Boolean
69 70 71 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 69 def all_workers_finished? @jobs.all? { |_job_id, job| job.work_done? } end |
#apply_confirmation_for_job(job) ⇒ Object
115 116 117 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 115 def apply_confirmation_for_job(job) configuration.apply_stage_confirmation.include?(job.stage) && apply_confirmations? end |
#apply_confirmations? ⇒ Boolean
106 107 108 109 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 106 def apply_confirmations? confirmations = configuration.task_confirmations confirmations.is_a?(Array) && confirmations.present? end |
#can_tag_staging? ⇒ Boolean
211 212 213 214 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 211 def can_tag_staging? @job_manager.can_tag_staging? && @job_manager.tag_staging_exists? && @jobs.find { |_job_id, job| job.stage == 'production' }.blank? end |
#check_workers_done? ⇒ Boolean
91 92 93 94 95 96 97 98 99 100 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 91 def check_workers_done? Thread.new do loop do if Actor.current.alive? && all_workers_finished? @workers_terminated.signal('completed') break end end end end |
#confirm_task_approval(result, task, processed_job = nil) ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 183 def confirm_task_approval(result, task, processed_job = nil) return unless result.present? result = print_confirm_task_approvall(result, task, processed_job) return unless action_confirmed?(result) @jobs.pmap do |job_id, job| worker = get_worker_for_job(job_id) if worker.alive? worker.publish_rake_event('approved' => 'yes', 'action' => 'invoke', 'job_id' => job.id, 'task' => task ) end end end |
#delegate_job(job, old_job = "") ⇒ Object
call to send an actor a job
50 51 52 53 54 55 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 50 def delegate_job(job, old_job = "") @jobs[job.id] = job # debug(@jobs) # start work and send it to the background @workers.work(job, Actor.current, old_job) end |
#dispatch_new_job(job, options = {}) ⇒ Object
216 217 218 219 220 221 222 223 224 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 216 def dispatch_new_job(job, = {}) return unless job.is_a?(CapistranoMulticonfigParallel::Job) .stringify_keys! if .present? env_opts = ['skip_env_options'].present? ? {} : @job_manager.(job.app, job.stage) = job..except!('id', 'status', 'exit_status').merge('env_options' => job..merge(env_opts)) new_job = CapistranoMulticonfigParallel::Job.new(@job_manager, .merge()) log_to_file("Trying to DiSPATCH new JOB #{new_job.inspect}") async.delegate_job(new_job, job) unless job.rolling_back? end |
#get_job_status(job) ⇒ Object
lookup status of job by asking actor running it
227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 227 def get_job_status(job) status = nil if job.present? if job.is_a?(CapistranoMulticonfigParallel::Job) actor = @job_to_worker[job.id] status = actor.job_status else actor = @job_to_worker[job] status = actor.job_status end end status end |
#get_worker_for_job(job) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 199 def get_worker_for_job(job) if job.present? if job.is_a?(CapistranoMulticonfigParallel::Job) @job_to_worker[job.id] else @job_to_worker[job] end else return nil end end |
#mark_completed_remaining_tasks(job) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 128 def mark_completed_remaining_tasks(job) return unless apply_confirmation_for_job(job) configuration.task_confirmations.each_with_index do |task, _index| fake_result = proc { |sum| sum } task_confirmation = @job_to_condition[job.id][task] next unless task_confirmation[:status] != 'confirmed' log_to_file("worker #{job.id} with action #{job.action} status #{job.status} and exit status #{job.exit_status} tries to mark fake the task #{task} with status #{task_confirmation[:status]}") task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(fake_result) end end |
#print_confirm_task_approvall(result, task, job) ⇒ Object
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 167 def print_confirm_task_approvall(result, task, job) return if result.is_a?(Proc) = "Do you want to continue the deployment and execute #{task.upcase}" += " for JOB #{job.id}" if job.present? += '?' if Celluloid::Actor[:terminal_server].present? && Celluloid::Actor[:terminal_server].alive? apps_symlink_confirmation = Celluloid::Actor[:terminal_server].show_confirmation(, 'Y/N') until apps_symlink_confirmation.present? sleep(0.1) # keep current thread alive end apps_symlink_confirmation else 'y' end end |
#process_jobs ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 73 def process_jobs @workers_terminated = Celluloid::Condition.new if syncronized_confirmation? @job_to_worker.pmap do |_job_id, worker| worker.async.start_task end wait_task_confirmations end terminal_show async.check_workers_done? condition = @workers_terminated.wait until condition.present? sleep(0.1) # keep current thread alive end log_to_file("all jobs have completed #{condition}") terminal_show end |
#register_worker_for_job(job, worker) ⇒ Object
call back from actor once it has received it’s job actor should do this asap
59 60 61 62 63 64 65 66 67 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 59 def register_worker_for_job(job, worker) @job_to_worker[job.id] = worker @worker_to_job[worker.mailbox.address] = job log_to_file("worker #{worker.job_id} registed into manager") Actor.current.link worker worker.async.start_task if !syncronized_confirmation? || job.rolling_back? return unless syncronized_confirmation? @registration_complete = true if @job_manager.jobs.size == @jobs.size end |
#setup_worker_conditions(job) ⇒ Object
119 120 121 122 123 124 125 126 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 119 def setup_worker_conditions(job) return unless apply_confirmation_for_job(job) hash_conditions = {} configuration.task_confirmations.each do |task| hash_conditions[task] = { condition: Celluloid::Condition.new, status: 'unconfirmed' } end @job_to_condition[job.id] = hash_conditions end |
#start_bundler_supervision_if_needed ⇒ Object
41 42 43 44 45 46 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 41 def start_bundler_supervision_if_needed return if configuration.check_app_bundler_dependencies.to_s.downcase != 'true' @bundler_workers = setup_pool_of_actor(@worker_supervisor, actor_name: :bundler_workers, type: CapistranoMulticonfigParallel::BundlerWorker, size: 10) Actor.current.link @bundler_workers setup_actor_supervision(@worker_supervisor, actor_name: :bundler_terminal_server, type: CapistranoMulticonfigParallel::BundlerTerminalTable, args: [Actor.current, @job_manager, configuration.fetch(:terminal, {})]) end |
#syncronized_confirmation? ⇒ Boolean
111 112 113 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 111 def syncronized_confirmation? !can_tag_staging? end |
#terminal_show ⇒ Object
102 103 104 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 102 def terminal_show Celluloid::Actor[:terminal_server].async.notify_time_change(CapistranoMulticonfigParallel::TerminalTable.topic, type: 'output') if Celluloid::Actor[:terminal_server].alive? end |
#wait_condition_for_task(job_id, task) ⇒ Object
148 149 150 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 148 def wait_condition_for_task(job_id, task) @job_to_condition[job_id][task][:condition].wait end |
#wait_task_confirmations ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 152 def wait_task_confirmations stage_apply = configuration.apply_stage_confirmation.include?(@job_manager.stage) return if !stage_apply || !syncronized_confirmation? configuration.task_confirmations.each_with_index do |task, _index| results = [] @jobs.pmap do |job_id, _job| result = wait_condition_for_task(job_id, task) results << result end if results.size == @jobs.size && !all_workers_finished? confirm_task_approval(results, task) end end end |
#wait_task_confirmations_worker(job) ⇒ Object
140 141 142 143 144 145 146 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 140 def wait_task_confirmations_worker(job) return if !apply_confirmation_for_job(job) || !syncronized_confirmation? configuration.task_confirmations.each_with_index do |task, _index| result = wait_condition_for_task(job.id, task) confirm_task_approval(result, task, job) end end |
#worker_died(worker, reason) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 241 def worker_died(worker, reason) job = @worker_to_job[worker.mailbox.address] mailbox = worker.mailbox log_to_file("worker_died: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died for reason: #{reason}") return true if job.blank? || job.rolling_back? || job.action != 'deploy' #job.rollback_changes_to_application @worker_to_job.delete(mailbox.address) log_to_file("RESTARTING: worker job #{job.inspect} with mailbox #{mailbox.inspect} and #{mailbox.address.inspect} died for reason: #{reason}") dispatch_new_job(job, skip_env_options: true, action: 'deploy:rollback') end |