Class: CapistranoMulticonfigParallel::CelluloidManager
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidManager
- Includes:
- Celluloid, Celluloid::Logger, Celluloid::Notifications
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb
Overview
rubocop:disable ClassLength
Instance Attribute Summary collapse
-
#actor_system ⇒ Object
Returns the value of attribute actor_system.
-
#job_to_condition ⇒ Object
Returns the value of attribute job_to_condition.
-
#job_to_worker ⇒ Object
Returns the value of attribute job_to_worker.
-
#jobs ⇒ Object
Returns the value of attribute jobs.
-
#mutex ⇒ Object
Returns the value of attribute mutex.
-
#registration_complete ⇒ Object
Returns the value of attribute registration_complete.
-
#worker_supervisor ⇒ Object
readonly
Returns the value of attribute worker_supervisor.
-
#worker_to_job ⇒ Object
Returns the value of attribute worker_to_job.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
Instance Method Summary collapse
- #apply_confirmation_for_worker(worker) ⇒ Object
- #apply_confirmations? ⇒ Boolean
- #can_tag_staging? ⇒ Boolean
- #confirm_task_approval(result, task, worker = nil) ⇒ Object
-
#delegate(job) ⇒ Object
call to send an actor a job.
- #dispatch_new_job(job) ⇒ Object
- #generate_job_id(job) ⇒ Object
-
#get_job_status(job) ⇒ Object
lookup status of job by asking actor running it.
- #get_worker_for_job(job) ⇒ Object
-
#initialize(job_manager) ⇒ CelluloidManager
constructor
A new instance of CelluloidManager.
- #job_failed?(job) ⇒ Boolean
- #mark_completed_remaining_tasks(worker) ⇒ Object
- #print_confirm_task_approvall(result, task, worker = nil) ⇒ Object
- #process_job(job) ⇒ Object
- #process_jobs ⇒ Object
-
#register_worker_for_job(job, worker) ⇒ Object
call back from actor once it has received it’s job actor should do this asap.
- #setup_worker_conditions(worker) ⇒ Object
- #start_worker(job, worker) ⇒ Object
- #syncronization_required? ⇒ Boolean
- #syncronized_confirmation? ⇒ Boolean
- #wait_condition_for_task(job_id, task) ⇒ Object
- #wait_task_confirmations ⇒ Object
- #wait_task_confirmations_worker(worker) ⇒ Object
- #worker_died(worker, reason) ⇒ Object
Constructor Details
#initialize(job_manager) ⇒ CelluloidManager
Returns a new instance of CelluloidManager.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 17 def initialize(job_manager) # start SupervisionGroup @worker_supervisor = Celluloid::SupervisionGroup.run! @job_manager = job_manager @registration_complete = false # Get a handle on the SupervisionGroup::Member @actor_system = Celluloid.boot @mutex = Mutex.new # http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member @workers = @worker_supervisor.pool(CapistranoMulticonfigParallel::CelluloidWorker, as: :workers, size: 10) Actor.current.link @workers # Get a handle on the PoolManager # http://rubydoc.info/gems/celluloid/Celluloid/PoolManager # @workers = workers_pool.actor @conditions = [] @jobs = {} @job_to_worker = {} @worker_to_job = {} @job_to_condition = {} @worker_supervisor.supervise_as(:terminal_server, CapistranoMulticonfigParallel::TerminalTable, Actor.current) @worker_supervisor.supervise_as(:web_server, CelluloidPubsub::WebServer, self.class.websocket_config.merge(enable_debug: self.class.debug_websocket?)) end |
Instance Attribute Details
#actor_system ⇒ Object
Returns the value of attribute actor_system.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def actor_system @actor_system end |
#job_to_condition ⇒ Object
Returns the value of attribute job_to_condition.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def job_to_condition @job_to_condition end |
#job_to_worker ⇒ Object
Returns the value of attribute job_to_worker.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def job_to_worker @job_to_worker end |
#jobs ⇒ Object
Returns the value of attribute jobs.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def jobs @jobs end |
#mutex ⇒ Object
Returns the value of attribute mutex.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def mutex @mutex end |
#registration_complete ⇒ Object
Returns the value of attribute registration_complete.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def registration_complete @registration_complete end |
#worker_supervisor ⇒ Object (readonly)
Returns the value of attribute worker_supervisor.
14 15 16 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14 def worker_supervisor @worker_supervisor end |
#worker_to_job ⇒ Object
Returns the value of attribute worker_to_job.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def worker_to_job @worker_to_job end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
14 15 16 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14 def workers @workers end |
Class Method Details
.debug_enabled? ⇒ Boolean
41 42 43 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 41 def self.debug_enabled? debug_enabled end |
.debug_websocket? ⇒ Boolean
45 46 47 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 45 def self.debug_websocket? websocket_config['enable_debug'].to_s == 'true' end |
.websocket_config ⇒ Object
49 50 51 52 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 49 def self.websocket_config config = CapistranoMulticonfigParallel.configuration[:websocket_server] config.present? && config.is_a?(Hash) ? config.stringify_keys : {} end |
Instance Method Details
#apply_confirmation_for_worker(worker) ⇒ Object
123 124 125 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 123 def apply_confirmation_for_worker(worker) worker.alive? && CapistranoMulticonfigParallel.configuration.apply_stage_confirmation.include?(worker.env_name) end |
#apply_confirmations? ⇒ Boolean
111 112 113 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 111 def apply_confirmations? CapistranoMulticonfigParallel.configuration.task_confirmation_active.to_s.downcase == 'true' end |
#can_tag_staging? ⇒ Boolean
213 214 215 216 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 213 def can_tag_staging? @job_manager.can_tag_staging? && @jobs.find { |_job_id, job| job['env'] == 'production' }.blank? end |
#confirm_task_approval(result, task, worker = nil) ⇒ Object
186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 186 def confirm_task_approval(result, task, worker = nil) return unless result.present? print_confirm_task_approvall(result, task, worker = nil) return if fetch(:apps_symlink_confirmation).blank? || fetch(:apps_symlink_confirmation).downcase != 'y' @jobs.pmap do |job_id, job| worker = get_worker_for_job(job_id) worker.publish_rake_event('approved' => 'yes', 'action' => 'invoke', 'job_id' => job['id'], 'task' => task ) end end |
#delegate(job) ⇒ Object
call to send an actor a job
64 65 66 67 68 69 70 71 72 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 64 def delegate(job) job = job.stringify_keys job['id'] = generate_job_id(job) unless job_failed?(job) @jobs[job['id']] = job job['env_options'][CapistranoMulticonfigParallel::ENV_KEY_JOB_ID] = job['id'] # debug(@jobs) # start work and send it to the background @workers.async.work(job, Actor.current) end |
#dispatch_new_job(job) ⇒ Object
218 219 220 221 222 223 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 218 def dispatch_new_job(job) original_env = job['env_options'] env_opts = @job_manager.(job['app_name'], job['stage']) job['env_options'] = original_env.merge(env_opts) async.delegate(job) end |
#generate_job_id(job) ⇒ Object
54 55 56 57 58 59 60 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 54 def generate_job_id(job) primary_key = @jobs.size + 1 job['id'] = primary_key @jobs[primary_key] = job @jobs[primary_key] job['id'] end |
#get_job_status(job) ⇒ Object
lookup status of job by asking actor running it
241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 241 def get_job_status(job) status = nil if job.present? if job.is_a?(Hash) job = job.stringify_keys actor = @registered_jobs[job['id']] status = actor.status else actor = @registered_jobs[job.to_i] status = actor.status end end status end |
#get_worker_for_job(job) ⇒ Object
200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 200 def get_worker_for_job(job) if job.present? if job.is_a?(Hash) job = job.stringify_keys @job_to_worker[job['id']] else @job_to_worker[job.to_i] end else return nil end end |
#job_failed?(job) ⇒ Boolean
256 257 258 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 256 def job_failed?(job) job['worker_action'].present? && job['worker_action'] == 'worker_died' end |
#mark_completed_remaining_tasks(worker) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 136 def mark_completed_remaining_tasks(worker) return if !apply_confirmation_for_worker(worker) || !apply_confirmations? CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, _index| fake_result = proc { |sum| sum } task_confirmation = @job_to_condition[worker.job_id][task] if task_confirmation[:status] != 'confirmed' task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(fake_result) end end end |
#print_confirm_task_approvall(result, task, worker = nil) ⇒ Object
175 176 177 178 179 180 181 182 183 184 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 175 def print_confirm_task_approvall(result, task, worker = nil) return if result.is_a?(Proc) = "Do you want to continue the deployment and execute #{task.upcase}" += " for JOB #{worker.job_id}" if worker.present? += '?' set :apps_symlink_confirmation, CapistranoMulticonfigParallel.ask_confirm(, 'Y/N') until fetch(:apps_symlink_confirmation).present? sleep(0.1) # keep current thread alive end end |
#process_job(job) ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 225 def process_job(job) = {} job['env_options'].each do |key, value| [key] = value if value.present? end { 'job_id' => job['id'], 'app_name' => job['app'], 'env_name' => job['env'], 'action_name' => job['action'], 'env_options' => , 'task_arguments' => job['task_arguments'] } end |
#process_jobs ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 96 def process_jobs if syncronized_confirmation? @job_to_worker.pmap do |_job_id, worker| worker.async.start_task end wait_task_confirmations end condition = @job_to_worker.all? { |_job_id, worker| worker.alive? && worker.worker_state == 'finished' } until condition == true sleep(0.1) # keep current thread alive end debug("all jobs have completed #{condition}") if self.class.debug_enabled? @job_manager.condition.signal('completed') if condition end |
#register_worker_for_job(job, worker) ⇒ Object
call back from actor once it has received it’s job actor should do this asap
76 77 78 79 80 81 82 83 84 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 76 def register_worker_for_job(job, worker) job = job.stringify_keys if job['id'].blank? debug("job id not found. delegating again the job #{job.inspect}") if self.class.debug_enabled? delegate(job) else start_worker(job, worker) end end |
#setup_worker_conditions(worker) ⇒ Object
127 128 129 130 131 132 133 134 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 127 def setup_worker_conditions(worker) return if !apply_confirmation_for_worker(worker) || !apply_confirmations? hash_conditions = {} CapistranoMulticonfigParallel.configuration.task_confirmations.each do |task| hash_conditions[task] = { condition: Celluloid::Condition.new, status: 'unconfirmed' } end @job_to_condition[worker.job_id] = hash_conditions end |
#start_worker(job, worker) ⇒ Object
86 87 88 89 90 91 92 93 94 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 86 def start_worker(job, worker) worker.job_id = job['id'] if worker.job_id.blank? @job_to_worker[job['id']] = worker @worker_to_job[worker.mailbox.address] = job debug("worker #{worker.job_id} registed into manager") if self.class.debug_enabled? Actor.current.link worker worker.async.start_task unless syncronized_confirmation? @registration_complete = true if @job_manager.jobs.size == @job_to_worker.size end |
#syncronization_required? ⇒ Boolean
114 115 116 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 114 def syncronization_required? CapistranoMulticonfigParallel.configuration.syncronize_confirmation.to_s.downcase == 'true' end |
#syncronized_confirmation? ⇒ Boolean
118 119 120 121 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 118 def syncronized_confirmation? ( syncronization_required? && !@job_manager.executes_deploy_stages?) || (syncronization_required? && @job_manager.executes_deploy_stages? && !@job_manager.can_tag_staging? && @job_manager.confirmation_applies_to_all_workers?) end |
#wait_condition_for_task(job_id, task) ⇒ Object
156 157 158 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 156 def wait_condition_for_task(job_id, task) @job_to_condition[job_id][task][:condition].wait end |
#wait_task_confirmations ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 160 def wait_task_confirmations stage_apply = CapistranoMulticonfigParallel.configuration.apply_stage_confirmation.include?(@job_manager.stage) return if !apply_confirmations? || !stage_apply || !syncronized_confirmation? CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, _index| results = [] @jobs.pmap do |job_id, _job| result = wait_condition_for_task(job_id, task) results << result end if results.size == @jobs.size confirm_task_approval(results, task) end end end |
#wait_task_confirmations_worker(worker) ⇒ Object
148 149 150 151 152 153 154 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 148 def wait_task_confirmations_worker(worker) return if !apply_confirmations? || !apply_confirmation_for_worker(worker) || syncronized_confirmation? CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, _index| result = wait_condition_for_task(worker.job_id, task) confirm_task_approval(result, task, worker) if result.present? end end |
#worker_died(worker, reason) ⇒ Object
260 261 262 263 264 265 266 267 268 269 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 260 def worker_died(worker, reason) debug("worker with mailbox #{worker.mailbox.inspect} died for reason: #{reason}") if self.class.debug_enabled? job = @worker_to_job[worker.mailbox.address] @worker_to_job.delete(worker.mailbox.address) debug "restarting #{job} on new worker" if self.class.debug_enabled? return if job.blank? || job_failed?(job) return unless job['action_name'] == 'deploy' job = job.merge(:action => 'deploy:rollback', 'worker_action' => 'worker_died') delegate(job) end |