Class: CapistranoMulticonfigParallel::CelluloidManager
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidManager
- Includes:
- Celluloid, Celluloid::Logger, Celluloid::Notifications
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb
Overview
rubocop:disable ClassLength
Instance Attribute Summary collapse
-
#actor_system ⇒ Object
Returns the value of attribute actor_system.
-
#job_to_condition ⇒ Object
Returns the value of attribute job_to_condition.
-
#job_to_worker ⇒ Object
Returns the value of attribute job_to_worker.
-
#jobs ⇒ Object
Returns the value of attribute jobs.
-
#mutex ⇒ Object
Returns the value of attribute mutex.
-
#registration_complete ⇒ Object
Returns the value of attribute registration_complete.
-
#worker_supervisor ⇒ Object
readonly
Returns the value of attribute worker_supervisor.
-
#worker_to_job ⇒ Object
Returns the value of attribute worker_to_job.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Class Method Summary collapse
Instance Method Summary collapse
- #confirm_task_approval(results, task) ⇒ Object
-
#delegate(job) ⇒ Object
call to send an actor a job.
- #generate_job_id(job) ⇒ Object
-
#get_job_status(job) ⇒ Object
lookup status of job by asking actor running it.
- #get_worker_for_job(job) ⇒ Object
-
#initialize(job_manager) ⇒ CelluloidManager
constructor
A new instance of CelluloidManager.
- #process_jobs(&block) ⇒ Object
-
#register_worker_for_job(job, worker) ⇒ Object
call back from actor once it has received it’s job actor should do this asap.
- #wait_task_confirmations ⇒ Object
- #worker_died(worker, reason) ⇒ Object
Constructor Details
#initialize(job_manager) ⇒ CelluloidManager
Returns a new instance of CelluloidManager.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 17 def initialize(job_manager) # start SupervisionGroup @worker_supervisor = Celluloid::SupervisionGroup.run! @job_manager = job_manager @registration_complete = false # Get a handle on the SupervisionGroup::Member @actor_system = Celluloid.boot @mutex = Mutex.new # http://rubydoc.info/gems/celluloid/Celluloid/SupervisionGroup/Member @workers = @worker_supervisor.pool(CapistranoMulticonfigParallel::CelluloidWorker, as: :workers, size: 10) # Get a handle on the PoolManager # http://rubydoc.info/gems/celluloid/Celluloid/PoolManager # @workers = workers_pool.actor @conditions = [] @jobs = {} @job_to_worker = {} @worker_to_job = {} @job_to_condition = {} @worker_supervisor.supervise_as(:terminal_server, CapistranoMulticonfigParallel::TerminalTable, Actor.current) @worker_supervisor.supervise_as(:web_server, CelluloidPubsub::WebServer, self.class.websocket_config.merge(enable_debug: self.class.debug_websocket?)) end |
Instance Attribute Details
#actor_system ⇒ Object
Returns the value of attribute actor_system.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def actor_system @actor_system end |
#job_to_condition ⇒ Object
Returns the value of attribute job_to_condition.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def job_to_condition @job_to_condition end |
#job_to_worker ⇒ Object
Returns the value of attribute job_to_worker.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def job_to_worker @job_to_worker end |
#jobs ⇒ Object
Returns the value of attribute jobs.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def jobs @jobs end |
#mutex ⇒ Object
Returns the value of attribute mutex.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def mutex @mutex end |
#registration_complete ⇒ Object
Returns the value of attribute registration_complete.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def registration_complete @registration_complete end |
#worker_supervisor ⇒ Object (readonly)
Returns the value of attribute worker_supervisor.
14 15 16 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14 def worker_supervisor @worker_supervisor end |
#worker_to_job ⇒ Object
Returns the value of attribute worker_to_job.
12 13 14 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 12 def worker_to_job @worker_to_job end |
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
14 15 16 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 14 def workers @workers end |
Class Method Details
.debug_enabled? ⇒ Boolean
40 41 42 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 40 def self.debug_enabled? debug_enabled end |
.debug_websocket? ⇒ Boolean
44 45 46 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 44 def self.debug_websocket? websocket_config['enable_debug'].to_s == 'true' end |
.websocket_config ⇒ Object
48 49 50 51 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 48 def self.websocket_config config = CapistranoMulticonfigParallel.configuration[:websocket_server] config.present? && config.is_a?(Hash) ? config.stringify_keys : {} end |
Instance Method Details
#confirm_task_approval(results, task) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 121 def confirm_task_approval(results, task) return unless results.present? set :apps_symlink_confirmation, CapistranoMulticonfigParallel.ask_confirm("Do you want to continue the deployment and execute #{task}?", 'Y/N') until fetch(:apps_symlink_confirmation).present? sleep(0.1) # keep current thread alive end return if fetch(:apps_symlink_confirmation).blank? || fetch(:apps_symlink_confirmation).downcase != 'y' @jobs.pmap do |job_id, job| worker = get_worker_for_job(job_id) worker.publish_rake_event('approved' => 'yes', 'action' => 'invoke', 'job_id' => job['id'], 'task' => task ) end end |
#delegate(job) ⇒ Object
call to send an actor a job
63 64 65 66 67 68 69 70 71 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 63 def delegate(job) job = job.stringify_keys job['id'] = generate_job_id(job) if job['worker_action'] != 'worker_died' @jobs[job['id']] = job job['env_options'][CapistranoMulticonfigParallel::ENV_KEY_JOB_ID] = job['id'] # debug(@jobs) # start work and send it to the background @workers.async.work(job, Actor.current) end |
#generate_job_id(job) ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 53 def generate_job_id(job) primary_key = @jobs.size + 1 job['id'] = primary_key @jobs[primary_key] = job @jobs[primary_key] job['id'] end |
#get_job_status(job) ⇒ Object
lookup status of job by asking actor running it
152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 152 def get_job_status(job) status = nil if job.present? if job.is_a?(Hash) job = job.stringify_keys actor = @registered_jobs[job['id']] status = actor.status else actor = @registered_jobs[job.to_i] status = actor.status end end status end |
#get_worker_for_job(job) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 138 def get_worker_for_job(job) if job.present? if job.is_a?(Hash) job = job.stringify_keys @job_to_worker[job['id']] else @job_to_worker[job.to_i] end else return nil end end |
#process_jobs(&block) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 92 def process_jobs(&block) @job_to_worker.pmap do |_job_id, worker| worker.async.start_task end if block_given? block.call else wait_task_confirmations end results2 = [] @job_to_condition.pmap do |_job_id, hash| results2 << hash[:last_condition].wait end @job_manager.condition.signal(results2) if results2.size == @jobs.size end |
#register_worker_for_job(job, worker) ⇒ Object
call back from actor once it has received it’s job actor should do this asap
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 75 def register_worker_for_job(job, worker) job = job.stringify_keys if job['id'].blank? debug("job id not found. delegating again the job #{job.inspect}") if self.class.debug_enabled? delegate(job) else worker.job_id = job['id'] if worker.job_id.blank? @job_to_worker[job['id']] = worker @worker_to_job[worker.mailbox.address] = job debug("worker #{worker.job_id} registed into manager") if self.class.debug_enabled? Actor.current.link worker if @job_manager.jobs.size == @job_to_worker.size @registration_complete = true end end end |
#wait_task_confirmations ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 108 def wait_task_confirmations return unless CapistranoMulticonfigParallel.configuration.task_confirmation_active CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index| results = [] @jobs.pmap do |job_id, _job| results << @job_to_condition[job_id][:first_condition][index].wait end if results.size == @jobs.size confirm_task_approval(results, task) end end end |
#worker_died(worker, reason) ⇒ Object
167 168 169 170 171 172 173 174 175 176 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_manager.rb', line 167 def worker_died(worker, reason) debug("worker with mailbox #{worker.mailbox.inspect} died for reason: #{reason}") if self.class.debug_enabled? job = @worker_to_job[worker.mailbox.address] @worker_to_job.delete(worker.mailbox.address) debug "restarting #{job} on new worker" if self.class.debug_enabled? return if job.blank? || job['worker_action'] == 'worker_died' return unless job['worker_action'] == 'deploy' job = job.merge(:action => 'deploy:rollback', 'worker_action' => 'worker_died') delegate(job) end |