Class: CapistranoMulticonfigParallel::CelluloidWorker
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidWorker
- Includes:
- BaseActorHelper
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb
Overview
worker that will spawn a child process in order to execute a capistrano job and monitor that process
Constant Summary collapse
- ATTRIBUTE_LIST =
[ :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :socket_connection, :task_argv, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :invocation_chain, :filename, :worker_log, :exit_status, :old_job ]
Instance Attribute Summary collapse
-
#job ⇒ Hash
Options used for executing capistrano task.
-
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
The instance of the manager that delegated the job to this worker.
Instance Method Summary collapse
- #check_child_proces ⇒ Object
- #check_gitflow ⇒ Object
- #execute_after_succesfull_subscription ⇒ Object
- #execute_deploy ⇒ Object
- #executed_task?(task) ⇒ Boolean
- #finish_worker(exit_status) ⇒ Object
- #handle_subscription(message) ⇒ Object
-
#initialize(*args) ⇒ CelluloidWorker
constructor
A new instance of CelluloidWorker.
- #invocation_chain ⇒ Object
- #notify_finished(exit_status, _runner_status) ⇒ Object
- #on_close(code, reason) ⇒ Object
- #on_message(message) ⇒ Object
- #publish_rake_event(data) ⇒ Object
- #rake_tasks ⇒ Object
- #save_tasks_to_be_executed(message) ⇒ Object
- #send_msg(channel, message = nil) ⇒ Object
- #start_task ⇒ Object
- #task_approval(message) ⇒ Object
- #update_machine_state(name, options = {}) ⇒ Object
- #work(job, manager, old_job) ⇒ Object
- #worker_state ⇒ Object
Methods included from BaseActorHelper
Constructor Details
#initialize(*args) ⇒ CelluloidWorker
Returns a new instance of CelluloidWorker.
34 35 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 34 def initialize(*args) end |
Instance Attribute Details
#job ⇒ Hash
Returns options used for executing capistrano task.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 20 class CelluloidWorker include CapistranoMulticonfigParallel::BaseActorHelper ATTRIBUTE_LIST = [ :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :socket_connection, :task_argv, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :invocation_chain, :filename, :worker_log, :exit_status, :old_job ] attr_reader *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST attr_accessor *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST def initialize(*args) end def work(job, manager, old_job) @job = job @old_job = old_job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}") @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end def worker_state if job.status.to_s.downcase != 'dead' && Actor.current.alive? @machine.state.to_s.green else job.status = 'dead' job.status.upcase.red end end def start_task log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}") if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job) @old_job.new_jobs_dispatched << @job.id end @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end def publish_rake_event(data) log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}") @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data) end def () log_to_file("worker #{@job_id} received: #{.inspect}") if @socket_connection.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end def rake_tasks @rake_tasks ||= [] end def invocation_chain @invocation_chain ||= [] end def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.fetch_deploy_command log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus) end def check_child_proces @child_process = CapistranoMulticonfigParallel::ProcessRunner.new Actor.current.link @child_process @child_process end def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) mark_for_dispatching_new_job @manager.dispatch_new_job(@job, stage: 'production') end def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) elsif () #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) } # if gem_messsage.present? # async.update_machine_state("insta") # else async.update_machine_state(['task']) #end else log_to_file(, job_id: @job_id) end end def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end def task_approval() job_conditions = @manager.job_to_condition[@job_id] log_to_file("worker #{@job_id} checks if task : #{['task'].inspect} is included in #{configuration.task_confirmations.inspect}") if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' log_to_file("worker #{@job_id} signals approval for task : #{['task'].inspect}") task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end def update_machine_state(name, = {}) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") unless [:bundler] @machine.go_to_transition(name.to_s, ) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::TaskFailed.new(), ) if job.failed? # force worker to rollback end def send_msg(channel, = nil) = .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, message: } log_to_file("worker #{@job_id} triest to send to #{channel} #{}") publish channel, end def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive? update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished? end def notify_finished(exit_status, _runner_status) @job.mark_for_dispatching_new_job if exit_status != 0 @job.exit_status = exit_status finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::TaskFailed.new(), ) end # def inspect # to_s # end # # def to_s # "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) alive>" # rescue # "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) dead>" # end end |
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
Returns the instance of the manager that delegated the job to this worker.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 20 class CelluloidWorker include CapistranoMulticonfigParallel::BaseActorHelper ATTRIBUTE_LIST = [ :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :socket_connection, :task_argv, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :invocation_chain, :filename, :worker_log, :exit_status, :old_job ] attr_reader *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST attr_accessor *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST def initialize(*args) end def work(job, manager, old_job) @job = job @old_job = old_job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}") @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end def worker_state if job.status.to_s.downcase != 'dead' && Actor.current.alive? @machine.state.to_s.green else job.status = 'dead' job.status.upcase.red end end def start_task log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}") if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job) @old_job.new_jobs_dispatched << @job.id end @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end def publish_rake_event(data) log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}") @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data) end def () log_to_file("worker #{@job_id} received: #{.inspect}") if @socket_connection.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end def rake_tasks @rake_tasks ||= [] end def invocation_chain @invocation_chain ||= [] end def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.fetch_deploy_command log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus) end def check_child_proces @child_process = CapistranoMulticonfigParallel::ProcessRunner.new Actor.current.link @child_process @child_process end def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) mark_for_dispatching_new_job @manager.dispatch_new_job(@job, stage: 'production') end def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) elsif () #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) } # if gem_messsage.present? # async.update_machine_state("insta") # else async.update_machine_state(['task']) #end else log_to_file(, job_id: @job_id) end end def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end def task_approval() job_conditions = @manager.job_to_condition[@job_id] log_to_file("worker #{@job_id} checks if task : #{['task'].inspect} is included in #{configuration.task_confirmations.inspect}") if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' log_to_file("worker #{@job_id} signals approval for task : #{['task'].inspect}") task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end def update_machine_state(name, = {}) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") unless [:bundler] @machine.go_to_transition(name.to_s, ) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::TaskFailed.new(), ) if job.failed? # force worker to rollback end def send_msg(channel, = nil) = .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, message: } log_to_file("worker #{@job_id} triest to send to #{channel} #{}") publish channel, end def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive? update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished? end def notify_finished(exit_status, _runner_status) @job.mark_for_dispatching_new_job if exit_status != 0 @job.exit_status = exit_status finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::TaskFailed.new(), ) end # def inspect # to_s # end # # def to_s # "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) alive>" # rescue # "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) dead>" # end end |
Instance Method Details
#check_child_proces ⇒ Object
105 106 107 108 109 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 105 def check_child_proces @child_process = CapistranoMulticonfigParallel::ProcessRunner.new Actor.current.link @child_process @child_process end |
#check_gitflow ⇒ Object
115 116 117 118 119 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 115 def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) mark_for_dispatching_new_job @manager.dispatch_new_job(@job, stage: 'production') end |
#execute_after_succesfull_subscription ⇒ Object
84 85 86 87 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 84 def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end |
#execute_deploy ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 97 def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.fetch_deploy_command log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus) end |
#executed_task?(task) ⇒ Boolean
145 146 147 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 145 def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end |
#finish_worker(exit_status) ⇒ Object
181 182 183 184 185 186 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 181 def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive? update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished? end |
#handle_subscription(message) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 121 def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) elsif () #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) } # if gem_messsage.present? # async.update_machine_state("insta") # else async.update_machine_state(['task']) #end else log_to_file(, job_id: @job_id) end end |
#invocation_chain ⇒ Object
93 94 95 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 93 def invocation_chain @invocation_chain ||= [] end |
#notify_finished(exit_status, _runner_status) ⇒ Object
189 190 191 192 193 194 195 196 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 189 def notify_finished(exit_status, _runner_status) @job.mark_for_dispatching_new_job if exit_status != 0 @job.exit_status = exit_status finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::TaskFailed.new(), ) end |
#on_close(code, reason) ⇒ Object
111 112 113 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 111 def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end |
#on_message(message) ⇒ Object
74 75 76 77 78 79 80 81 82 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 74 def () log_to_file("worker #{@job_id} received: #{.inspect}") if @socket_connection.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end |
#publish_rake_event(data) ⇒ Object
69 70 71 72 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 69 def publish_rake_event(data) log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}") @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data) end |
#rake_tasks ⇒ Object
89 90 91 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 89 def rake_tasks @rake_tasks ||= [] end |
#save_tasks_to_be_executed(message) ⇒ Object
162 163 164 165 166 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 162 def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end |
#send_msg(channel, message = nil) ⇒ Object
175 176 177 178 179 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 175 def send_msg(channel, = nil) = .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, message: } log_to_file("worker #{@job_id} triest to send to #{channel} #{}") publish channel, end |
#start_task ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 61 def start_task log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}") if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job) @old_job.new_jobs_dispatched << @job.id end @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end |
#task_approval(message) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 149 def task_approval() job_conditions = @manager.job_to_condition[@job_id] log_to_file("worker #{@job_id} checks if task : #{['task'].inspect} is included in #{configuration.task_confirmations.inspect}") if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' log_to_file("worker #{@job_id} signals approval for task : #{['task'].inspect}") task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end |
#update_machine_state(name, options = {}) ⇒ Object
168 169 170 171 172 173 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 168 def update_machine_state(name, = {}) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") unless [:bundler] @machine.go_to_transition(name.to_s, ) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::TaskFailed.new(), ) if job.failed? # force worker to rollback end |
#work(job, manager, old_job) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 37 def work(job, manager, old_job) @job = job @old_job = old_job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}") @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end |
#worker_state ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 52 def worker_state if job.status.to_s.downcase != 'dead' && Actor.current.alive? @machine.state.to_s.green else job.status = 'dead' job.status.upcase.red end end |