Class: CapistranoMulticonfigParallel::CelluloidWorker
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidWorker
- Includes:
- Celluloid, Celluloid::Logger, Celluloid::Notifications
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb
Overview
rubocop:disable ClassLength worker that will spawn a child process in order to execute a capistrano job and monitor that process
Defined Under Namespace
Classes: TaskFailed
Instance Attribute Summary collapse
-
#action_name ⇒ Object
Returns the value of attribute action_name.
-
#app_name ⇒ Object
Returns the value of attribute app_name.
-
#client ⇒ Object
Returns the value of attribute client.
-
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
-
#env_name ⇒ Object
Returns the value of attribute env_name.
-
#env_options ⇒ Object
Returns the value of attribute env_options.
-
#execute_deploy ⇒ Object
Returns the value of attribute execute_deploy.
-
#executed_dry_run ⇒ Object
Returns the value of attribute executed_dry_run.
-
#job ⇒ Hash
Options used for executing capistrano task.
-
#job_id ⇒ Object
Returns the value of attribute job_id.
-
#last_manager_condition ⇒ Object
Returns the value of attribute last_manager_condition.
-
#machine ⇒ Object
Returns the value of attribute machine.
-
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
The instance of the manager that delegated the job to this worker.
-
#manager_condition ⇒ Object
Returns the value of attribute manager_condition.
-
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
-
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
-
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
-
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
-
#task_argv ⇒ Object
Returns the value of attribute task_argv.
-
#task_confirmations ⇒ Object
Returns the value of attribute task_confirmations.
Instance Method Summary collapse
- #construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) ⇒ Object
- #crashed? ⇒ Boolean
- #debug_enabled? ⇒ Boolean
- #execute_after_succesfull_subscription ⇒ Object
- #executes_deploy? ⇒ Boolean
- #handle_subscription(message) ⇒ Object
- #message_is_about_a_task?(message) ⇒ Boolean
- #need_confirmation_for_tasks? ⇒ Boolean
- #notify_finished(exit_status) ⇒ Object
- #on_close(code, reason) ⇒ Object
- #on_message(message) ⇒ Object
- #process_job(job) ⇒ Object
- #publish_rake_event(data) ⇒ Object
- #rake_actor_id(data) ⇒ Object
- #save_tasks_to_be_executed(message) ⇒ Object
- #send_msg(channel, message = nil) ⇒ Object
- #setup_command_line(*options) ⇒ Object
- #setup_task_arguments ⇒ Object
- #setup_worker_condition ⇒ Object
- #start_task ⇒ Object
- #task_approval(message) ⇒ Object
- #update_machine_state(name) ⇒ Object
- #work(job, manager) ⇒ Object
Instance Attribute Details
#action_name ⇒ Object
Returns the value of attribute action_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def action_name @action_name end |
#app_name ⇒ Object
Returns the value of attribute app_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def app_name @app_name end |
#client ⇒ Object
Returns the value of attribute client.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def client @client end |
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def current_task_number @current_task_number end |
#env_name ⇒ Object
Returns the value of attribute env_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def env_name @env_name end |
#env_options ⇒ Object
Returns the value of attribute env_options.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def end |
#execute_deploy ⇒ Object
Returns the value of attribute execute_deploy.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def execute_deploy @execute_deploy end |
#executed_dry_run ⇒ Object
Returns the value of attribute executed_dry_run.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def executed_dry_run @executed_dry_run end |
#job ⇒ Hash
Returns options used for executing capistrano task.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :task_confirmations, :manager_condition, :last_manager_condition # for task conifirmations from manager def work(job, manager) @job = job @manager = manager process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) setup_worker_condition manager.register_worker_for_job(job, Actor.current) end def debug_enabled? @manager.class.debug_enabled? end def start_task debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true) else async.execute_deploy end end def rake_tasks @rake_tasks ||= [] end def execute_deploy @execute_deploy = true debug("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress if !defined?(@child_process) || @child_process.nil? @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process else @client.unsubscribe("rake_worker_#{@job_id}_count") @child_process.exit_status = nil end setup_task_arguments debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, silent: true) end def on_close(code, reason) debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled? end def handle_subscription() if () save_tasks_to_be_executed() update_machine_state(['task']) # if message['action'] == 'invoke' debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled? task_approval() else debug("worker #{@job_id} could not handle #{message}") if debug_enabled? end end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end def task_approval() if @task_confirmations.include?(['task']) && ['action'] == 'invoke' @manager_condition[['task']].call(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() return unless ['action'] == 'count' debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled? @rake_tasks = [] if @rake_tasks.blank? @rake_tasks << ['task'] if @rake_tasks.last != ['task'] end def update_machine_state(name) debug("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") if debug_enabled? @machine.transitions.on(name.to_s, @machine.state => name.to_s) @machine.go_to_transition(name.to_s) raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback end def setup_command_line(*) @task_argv = [] .each do |option| @task_argv << option end @task_argv end def setup_task_arguments # stage = "#{@app_name}:#{@env_name} #{@action_name}" stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}" = ["#{stage}"] << "#{@action_name}[#{@task_arguments.join(',')}]" .each do |key, value| << "#{key}=#{value}" if value.present? end << '--trace' if debug_enabled? setup_command_line(*) end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def process_job(job) @job_id = job['id'] @app_name = job['app'] @env_name = job['env'] @action_name = job['action'] = {} job['env_options'].each do |key, value| [key] = value if value.present? end @task_arguments = job['task_arguments'] end def need_confirmation_for_tasks? executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active end def executes_deploy? (@action_name == 'deploy' || @action_name == 'deploy:rollback') end def setup_worker_condition job_termination_condition = Celluloid::Condition.new job_confirmation_conditions = [] CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task| if need_confirmation_for_tasks? job_confirmation_conditions << Celluloid::Condition.new else job_confirmation_conditions << proc { |sum| sum } end end @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition } construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) end def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) hash_conditions = {} CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index| blk = lambda do |sum| need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum) end hash_conditions[task] = blk end blk_termination = lambda do |sum| job_termination_condition.signal(sum) end @manager_condition = hash_conditions @last_manager_condition = blk_termination end def crashed? @action_name == 'deploy:rollback' end def notify_finished(exit_status) return unless @execute_deploy if exit_status.exitstatus != 0 debug("worker #{job_id} tries to terminate") terminate else update_machine_state('FINISHED') debug("worker #{job_id} notifies manager has finished") @last_manager_condition.call('yes') end end end |
#job_id ⇒ Object
Returns the value of attribute job_id.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def job_id @job_id end |
#last_manager_condition ⇒ Object
Returns the value of attribute last_manager_condition.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def last_manager_condition @last_manager_condition end |
#machine ⇒ Object
Returns the value of attribute machine.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def machine @machine end |
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
Returns the instance of the manager that delegated the job to this worker.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :task_confirmations, :manager_condition, :last_manager_condition # for task conifirmations from manager def work(job, manager) @job = job @manager = manager process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) setup_worker_condition manager.register_worker_for_job(job, Actor.current) end def debug_enabled? @manager.class.debug_enabled? end def start_task debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true) else async.execute_deploy end end def rake_tasks @rake_tasks ||= [] end def execute_deploy @execute_deploy = true debug("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress if !defined?(@child_process) || @child_process.nil? @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process else @client.unsubscribe("rake_worker_#{@job_id}_count") @child_process.exit_status = nil end setup_task_arguments debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, silent: true) end def on_close(code, reason) debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled? end def handle_subscription() if () save_tasks_to_be_executed() update_machine_state(['task']) # if message['action'] == 'invoke' debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled? task_approval() else debug("worker #{@job_id} could not handle #{message}") if debug_enabled? end end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end def task_approval() if @task_confirmations.include?(['task']) && ['action'] == 'invoke' @manager_condition[['task']].call(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() return unless ['action'] == 'count' debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled? @rake_tasks = [] if @rake_tasks.blank? @rake_tasks << ['task'] if @rake_tasks.last != ['task'] end def update_machine_state(name) debug("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") if debug_enabled? @machine.transitions.on(name.to_s, @machine.state => name.to_s) @machine.go_to_transition(name.to_s) raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback end def setup_command_line(*) @task_argv = [] .each do |option| @task_argv << option end @task_argv end def setup_task_arguments # stage = "#{@app_name}:#{@env_name} #{@action_name}" stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}" = ["#{stage}"] << "#{@action_name}[#{@task_arguments.join(',')}]" .each do |key, value| << "#{key}=#{value}" if value.present? end << '--trace' if debug_enabled? setup_command_line(*) end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def process_job(job) @job_id = job['id'] @app_name = job['app'] @env_name = job['env'] @action_name = job['action'] = {} job['env_options'].each do |key, value| [key] = value if value.present? end @task_arguments = job['task_arguments'] end def need_confirmation_for_tasks? executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active end def executes_deploy? (@action_name == 'deploy' || @action_name == 'deploy:rollback') end def setup_worker_condition job_termination_condition = Celluloid::Condition.new job_confirmation_conditions = [] CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task| if need_confirmation_for_tasks? job_confirmation_conditions << Celluloid::Condition.new else job_confirmation_conditions << proc { |sum| sum } end end @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition } construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) end def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) hash_conditions = {} CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index| blk = lambda do |sum| need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum) end hash_conditions[task] = blk end blk_termination = lambda do |sum| job_termination_condition.signal(sum) end @manager_condition = hash_conditions @last_manager_condition = blk_termination end def crashed? @action_name == 'deploy:rollback' end def notify_finished(exit_status) return unless @execute_deploy if exit_status.exitstatus != 0 debug("worker #{job_id} tries to terminate") terminate else update_machine_state('FINISHED') debug("worker #{job_id} notifies manager has finished") @last_manager_condition.call('yes') end end end |
#manager_condition ⇒ Object
Returns the value of attribute manager_condition.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def manager_condition @manager_condition end |
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def publisher_channel @publisher_channel end |
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def rake_tasks @rake_tasks end |
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def subscription_channel @subscription_channel end |
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def successfull_subscription @successfull_subscription end |
#task_argv ⇒ Object
Returns the value of attribute task_argv.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def task_argv @task_argv end |
#task_confirmations ⇒ Object
Returns the value of attribute task_confirmations.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def task_confirmations @task_confirmations end |
Instance Method Details
#construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 206 def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) hash_conditions = {} CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index| blk = lambda do |sum| need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum) end hash_conditions[task] = blk end blk_termination = lambda do |sum| job_termination_condition.signal(sum) end @manager_condition = hash_conditions @last_manager_condition = blk_termination end |
#crashed? ⇒ Boolean
221 222 223 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 221 def crashed? @action_name == 'deploy:rollback' end |
#debug_enabled? ⇒ Boolean
42 43 44 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 42 def debug_enabled? @manager.class.debug_enabled? end |
#execute_after_succesfull_subscription ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 72 def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true) else async.execute_deploy end end |
#executes_deploy? ⇒ Boolean
188 189 190 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 188 def executes_deploy? (@action_name == 'deploy' || @action_name == 'deploy:rollback') end |
#handle_subscription(message) ⇒ Object
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 111 def handle_subscription() if () save_tasks_to_be_executed() update_machine_state(['task']) # if message['action'] == 'invoke' debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled? task_approval() else debug("worker #{@job_id} could not handle #{message}") if debug_enabled? end end |
#message_is_about_a_task?(message) ⇒ Boolean
122 123 124 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 122 def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end |
#need_confirmation_for_tasks? ⇒ Boolean
184 185 186 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 184 def need_confirmation_for_tasks? executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active end |
#notify_finished(exit_status) ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 225 def notify_finished(exit_status) return unless @execute_deploy if exit_status.exitstatus != 0 debug("worker #{job_id} tries to terminate") terminate else update_machine_state('FINISHED') debug("worker #{job_id} notifies manager has finished") @last_manager_condition.call('yes') end end |
#on_close(code, reason) ⇒ Object
107 108 109 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 107 def on_close(code, reason) debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled? end |
#on_message(message) ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 62 def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end |
#process_job(job) ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 172 def process_job(job) @job_id = job['id'] @app_name = job['app'] @env_name = job['env'] @action_name = job['action'] = {} job['env_options'].each do |key, value| [key] = value if value.present? end @task_arguments = job['task_arguments'] end |
#publish_rake_event(data) ⇒ Object
54 55 56 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 54 def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end |
#rake_actor_id(data) ⇒ Object
58 59 60 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 58 def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end |
#save_tasks_to_be_executed(message) ⇒ Object
134 135 136 137 138 139 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 134 def save_tasks_to_be_executed() return unless ['action'] == 'count' debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled? @rake_tasks = [] if @rake_tasks.blank? @rake_tasks << ['task'] if @rake_tasks.last != ['task'] end |
#send_msg(channel, message = nil) ⇒ Object
168 169 170 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 168 def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end |
#setup_command_line(*options) ⇒ Object
148 149 150 151 152 153 154 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 148 def setup_command_line(*) @task_argv = [] .each do |option| @task_argv << option end @task_argv end |
#setup_task_arguments ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 156 def setup_task_arguments # stage = "#{@app_name}:#{@env_name} #{@action_name}" stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}" = ["#{stage}"] << "#{@action_name}[#{@task_arguments.join(',')}]" .each do |key, value| << "#{key}=#{value}" if value.present? end << '--trace' if debug_enabled? setup_command_line(*) end |
#setup_worker_condition ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 192 def setup_worker_condition job_termination_condition = Celluloid::Condition.new job_confirmation_conditions = [] CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task| if need_confirmation_for_tasks? job_confirmation_conditions << Celluloid::Condition.new else job_confirmation_conditions << proc { |sum| sum } end end @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition } construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) end |
#start_task ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 46 def start_task debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end |
#task_approval(message) ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 126 def task_approval() if @task_confirmations.include?(['task']) && ['action'] == 'invoke' @manager_condition[['task']].call(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end |
#update_machine_state(name) ⇒ Object
141 142 143 144 145 146 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 141 def update_machine_state(name) debug("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") if debug_enabled? @machine.transitions.on(name.to_s, @machine.state => name.to_s) @machine.go_to_transition(name.to_s) raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback end |
#work(job, manager) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 30 def work(job, manager) @job = job @manager = manager process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) setup_worker_condition manager.register_worker_for_job(job, Actor.current) end |