Class: CapistranoMulticonfigParallel::CelluloidWorker
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidWorker
- Includes:
- Celluloid, Celluloid::Logger, Celluloid::Notifications
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb
Overview
rubocop:disable ClassLength worker that will spawn a child process in order to execute a capistrano job and monitor that process
Defined Under Namespace
Classes: TaskFailed
Instance Attribute Summary collapse
-
#action_name ⇒ Object
Returns the value of attribute action_name.
-
#app_name ⇒ Object
Returns the value of attribute app_name.
-
#client ⇒ Object
Returns the value of attribute client.
-
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
-
#env_name ⇒ Object
Returns the value of attribute env_name.
-
#env_options ⇒ Object
Returns the value of attribute env_options.
-
#execute_deploy ⇒ Object
Returns the value of attribute execute_deploy.
-
#executed_dry_run ⇒ Object
Returns the value of attribute executed_dry_run.
-
#job ⇒ Hash
Options used for executing capistrano task.
-
#job_id ⇒ Object
Returns the value of attribute job_id.
-
#last_manager_condition ⇒ Object
Returns the value of attribute last_manager_condition.
-
#machine ⇒ Object
Returns the value of attribute machine.
-
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
The instance of the manager that delegated the job to this worker.
-
#manager_condition ⇒ Object
Returns the value of attribute manager_condition.
-
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
-
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
-
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
-
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
-
#task_argv ⇒ Object
Returns the value of attribute task_argv.
-
#task_confirmations ⇒ Object
Returns the value of attribute task_confirmations.
Instance Method Summary collapse
- #construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) ⇒ Object
- #crashed? ⇒ Boolean
- #debug_enabled? ⇒ Boolean
- #execute_after_succesfull_subscription ⇒ Object
- #executes_deploy? ⇒ Boolean
- #handle_subscription(message) ⇒ Object
- #message_is_about_a_task?(message) ⇒ Boolean
- #need_confirmation_for_tasks? ⇒ Boolean
- #notify_finished(exit_status) ⇒ Object
- #on_close(code, reason) ⇒ Object
- #on_message(message) ⇒ Object
- #process_job(job) ⇒ Object
- #publish_rake_event(data) ⇒ Object
- #rake_actor_id(data) ⇒ Object
- #save_tasks_to_be_executed(message) ⇒ Object
- #send_msg(channel, message = nil) ⇒ Object
- #setup_command_line(*options) ⇒ Object
- #setup_task_arguments ⇒ Object
- #setup_worker_condition ⇒ Object
- #start_task ⇒ Object
- #task_approval(message) ⇒ Object
- #update_machine_state(name) ⇒ Object
- #work(job, manager) ⇒ Object
Instance Attribute Details
#action_name ⇒ Object
Returns the value of attribute action_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def action_name @action_name end |
#app_name ⇒ Object
Returns the value of attribute app_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def app_name @app_name end |
#client ⇒ Object
Returns the value of attribute client.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def client @client end |
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def current_task_number @current_task_number end |
#env_name ⇒ Object
Returns the value of attribute env_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def env_name @env_name end |
#env_options ⇒ Object
Returns the value of attribute env_options.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def end |
#execute_deploy ⇒ Object
Returns the value of attribute execute_deploy.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def execute_deploy @execute_deploy end |
#executed_dry_run ⇒ Object
Returns the value of attribute executed_dry_run.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def executed_dry_run @executed_dry_run end |
#job ⇒ Hash
Returns options used for executing capistrano task.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :task_confirmations, :manager_condition, :last_manager_condition # for task conifirmations from manager def work(job, manager) @job = job @manager = manager process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) setup_worker_condition manager.register_worker_for_job(job, Actor.current) end def debug_enabled? @manager.class.debug_enabled? end def start_task debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true) else async.execute_deploy end end def rake_tasks @rake_tasks ||= [] end def execute_deploy @execute_deploy = true debug("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress if !defined?(@child_process) || @child_process.nil? @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process else @client.unsubscribe("rake_worker_#{@job_id}_count") @child_process.exit_status = nil end setup_task_arguments debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, silent: true) end def on_close(code, reason) debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled? end def handle_subscription() if () save_tasks_to_be_executed() update_machine_state(['task']) # if message['action'] == 'invoke' debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled? task_approval() else debug("worker #{@job_id} could not handle #{message}") if debug_enabled? end end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end def task_approval() if @task_confirmations.include?(['task']) && ['action'] == 'invoke' @manager_condition[['task']].call(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() return unless ['action'] == 'count' debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled? @rake_tasks = [] if @rake_tasks.blank? @rake_tasks << ['task'] if @rake_tasks.last != ['task'] end def update_machine_state(name) debug("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") if debug_enabled? @machine.transitions.on(name.to_s, @machine.state => name.to_s) @machine.go_to_transition(name.to_s) raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback end def setup_command_line(*) @task_argv = [] .each do |option| @task_argv << option end @task_argv end def setup_task_arguments # stage = "#{@app_name}:#{@env_name} #{@action_name}" stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}" = ["#{stage}"] << "#{@action_name}[#{@task_arguments.join(',')}]" .each do |key, value| << "#{key}=#{value}" if value.present? end << '--trace' if debug_enabled? setup_command_line(*) end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def process_job(job) @job_id = job['id'] @app_name = job['app'] @env_name = job['env'] @action_name = job['action'] = {} job['env_options'].each do |key, value| [key] = value if value.present? end @task_arguments = job['task_arguments'] end def need_confirmation_for_tasks? executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active end def executes_deploy? (@action_name == 'deploy' || @action_name == 'deploy:rollback') end def setup_worker_condition job_termination_condition = Celluloid::Condition.new job_confirmation_conditions = [] CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task| if need_confirmation_for_tasks? job_confirmation_conditions << Celluloid::Condition.new else job_confirmation_conditions << proc { |sum| sum } end end @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition } construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) end def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) hash_conditions = {} CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index| blk = lambda do |sum| need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum) end hash_conditions[task] = blk end blk_termination = lambda do |sum| job_termination_condition.signal(sum) end @manager_condition = hash_conditions @last_manager_condition = blk_termination end def crashed? @action_name == 'deploy:rollback' end def notify_finished(exit_status) return unless @execute_deploy if exit_status.exitstatus != 0 debug("worker #{job_id} tries to terminate") terminate else update_machine_state('FINISHED') debug("worker #{job_id} notifies manager has finished") @last_manager_condition.call('yes') end end end |
#job_id ⇒ Object
Returns the value of attribute job_id.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def job_id @job_id end |
#last_manager_condition ⇒ Object
Returns the value of attribute last_manager_condition.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def last_manager_condition @last_manager_condition end |
#machine ⇒ Object
Returns the value of attribute machine.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def machine @machine end |
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :task_confirmations, :manager_condition, :last_manager_condition # for task conifirmations from manager def work(job, manager) @job = job @manager = manager process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) setup_worker_condition manager.register_worker_for_job(job, Actor.current) end def debug_enabled? @manager.class.debug_enabled? end def start_task debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true) else async.execute_deploy end end def rake_tasks @rake_tasks ||= [] end def execute_deploy @execute_deploy = true debug("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress if !defined?(@child_process) || @child_process.nil? @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process else @client.unsubscribe("rake_worker_#{@job_id}_count") @child_process.exit_status = nil end setup_task_arguments debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, silent: true) end def on_close(code, reason) debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled? end def handle_subscription() if () save_tasks_to_be_executed() update_machine_state(['task']) # if message['action'] == 'invoke' debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled? task_approval() else debug("worker #{@job_id} could not handle #{message}") if debug_enabled? end end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end def task_approval() if @task_confirmations.include?(['task']) && ['action'] == 'invoke' @manager_condition[['task']].call(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() return unless ['action'] == 'count' debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled? @rake_tasks = [] if @rake_tasks.blank? @rake_tasks << ['task'] if @rake_tasks.last != ['task'] end def update_machine_state(name) debug("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") if debug_enabled? @machine.transitions.on(name.to_s, @machine.state => name.to_s) @machine.go_to_transition(name.to_s) raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback end def setup_command_line(*) @task_argv = [] .each do |option| @task_argv << option end @task_argv end def setup_task_arguments # stage = "#{@app_name}:#{@env_name} #{@action_name}" stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}" = ["#{stage}"] << "#{@action_name}[#{@task_arguments.join(',')}]" .each do |key, value| << "#{key}=#{value}" if value.present? end << '--trace' if debug_enabled? setup_command_line(*) end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def process_job(job) @job_id = job['id'] @app_name = job['app'] @env_name = job['env'] @action_name = job['action'] = {} job['env_options'].each do |key, value| [key] = value if value.present? end @task_arguments = job['task_arguments'] end def need_confirmation_for_tasks? executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active end def executes_deploy? (@action_name == 'deploy' || @action_name == 'deploy:rollback') end def setup_worker_condition job_termination_condition = Celluloid::Condition.new job_confirmation_conditions = [] CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task| if need_confirmation_for_tasks? job_confirmation_conditions << Celluloid::Condition.new else job_confirmation_conditions << proc { |sum| sum } end end @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition } construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) end def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) hash_conditions = {} CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index| blk = lambda do |sum| need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum) end hash_conditions[task] = blk end blk_termination = lambda do |sum| job_termination_condition.signal(sum) end @manager_condition = hash_conditions @last_manager_condition = blk_termination end def crashed? @action_name == 'deploy:rollback' end def notify_finished(exit_status) return unless @execute_deploy if exit_status.exitstatus != 0 debug("worker #{job_id} tries to terminate") terminate else update_machine_state('FINISHED') debug("worker #{job_id} notifies manager has finished") @last_manager_condition.call('yes') end end end |
#manager_condition ⇒ Object
Returns the value of attribute manager_condition.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def manager_condition @manager_condition end |
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def publisher_channel @publisher_channel end |
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def rake_tasks @rake_tasks end |
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def subscription_channel @subscription_channel end |
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def successfull_subscription @successfull_subscription end |
#task_argv ⇒ Object
Returns the value of attribute task_argv.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def task_argv @task_argv end |
#task_confirmations ⇒ Object
Returns the value of attribute task_confirmations.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def task_confirmations @task_confirmations end |
Instance Method Details
#construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) ⇒ Object
207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 207 def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) hash_conditions = {} CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index| blk = lambda do |sum| need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum) end hash_conditions[task] = blk end blk_termination = lambda do |sum| job_termination_condition.signal(sum) end @manager_condition = hash_conditions @last_manager_condition = blk_termination end |
#crashed? ⇒ Boolean
222 223 224 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 222 def crashed? @action_name == 'deploy:rollback' end |
#debug_enabled? ⇒ Boolean
42 43 44 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 42 def debug_enabled? @manager.class.debug_enabled? end |
#execute_after_succesfull_subscription ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 73 def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled? @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true) else async.execute_deploy end end |
#executes_deploy? ⇒ Boolean
189 190 191 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 189 def executes_deploy? (@action_name == 'deploy' || @action_name == 'deploy:rollback') end |
#handle_subscription(message) ⇒ Object
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 112 def handle_subscription() if () save_tasks_to_be_executed() update_machine_state(['task']) # if message['action'] == 'invoke' debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled? task_approval() else debug("worker #{@job_id} could not handle #{message}") if debug_enabled? end end |
#message_is_about_a_task?(message) ⇒ Boolean
123 124 125 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 123 def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end |
#need_confirmation_for_tasks? ⇒ Boolean
185 186 187 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 185 def need_confirmation_for_tasks? executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active end |
#notify_finished(exit_status) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 226 def notify_finished(exit_status) return unless @execute_deploy if exit_status.exitstatus != 0 debug("worker #{job_id} tries to terminate") terminate else update_machine_state('FINISHED') debug("worker #{job_id} notifies manager has finished") @last_manager_condition.call('yes') end end |
#on_close(code, reason) ⇒ Object
108 109 110 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 108 def on_close(code, reason) debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled? end |
#on_message(message) ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 63 def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end |
#process_job(job) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 173 def process_job(job) @job_id = job['id'] @app_name = job['app'] @env_name = job['env'] @action_name = job['action'] = {} job['env_options'].each do |key, value| [key] = value if value.present? end @task_arguments = job['task_arguments'] end |
#publish_rake_event(data) ⇒ Object
55 56 57 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 55 def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end |
#rake_actor_id(data) ⇒ Object
59 60 61 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 59 def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end |
#save_tasks_to_be_executed(message) ⇒ Object
135 136 137 138 139 140 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 135 def save_tasks_to_be_executed() return unless ['action'] == 'count' debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled? @rake_tasks = [] if @rake_tasks.blank? @rake_tasks << ['task'] if @rake_tasks.last != ['task'] end |
#send_msg(channel, message = nil) ⇒ Object
169 170 171 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 169 def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end |
#setup_command_line(*options) ⇒ Object
149 150 151 152 153 154 155 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 149 def setup_command_line(*) @task_argv = [] .each do |option| @task_argv << option end @task_argv end |
#setup_task_arguments ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 157 def setup_task_arguments # stage = "#{@app_name}:#{@env_name} #{@action_name}" stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}" = ["#{stage}"] << "#{@action_name}[#{@task_arguments.join(',')}]" .each do |key, value| << "#{key}=#{value}" if value.present? end << '--trace' if debug_enabled? setup_command_line(*) end |
#setup_worker_condition ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 193 def setup_worker_condition job_termination_condition = Celluloid::Condition.new job_confirmation_conditions = [] CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task| if need_confirmation_for_tasks? job_confirmation_conditions << Celluloid::Condition.new else job_confirmation_conditions << proc { |sum| sum } end end @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition } construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) end |
#start_task ⇒ Object
47 48 49 50 51 52 53 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 47 def start_task debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end |
#task_approval(message) ⇒ Object
127 128 129 130 131 132 133 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 127 def task_approval() if @task_confirmations.include?(['task']) && ['action'] == 'invoke' @manager_condition[['task']].call(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end |
#update_machine_state(name) ⇒ Object
142 143 144 145 146 147 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 142 def update_machine_state(name) debug("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") if debug_enabled? @machine.transitions.on(name.to_s, @machine.state => name.to_s) @machine.go_to_transition(name.to_s) raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback end |
#work(job, manager) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 30 def work(job, manager) @job = job @manager = manager process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) setup_worker_condition manager.register_worker_for_job(job, Actor.current) end |