Class: CapistranoMulticonfigParallel::CelluloidWorker
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidWorker
- Includes:
- BaseActorHelper
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb
Overview
worker that will spawn a child process in order to execute a capistrano job and monitor that process
Defined Under Namespace
Classes: TaskFailed
Instance Attribute Summary collapse
-
#action_name ⇒ Object
Returns the value of attribute action_name.
-
#app_name ⇒ Object
Returns the value of attribute app_name.
-
#client ⇒ Object
Returns the value of attribute client.
-
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
-
#env_name ⇒ Object
Returns the value of attribute env_name.
-
#env_options ⇒ Object
Returns the value of attribute env_options.
-
#exit_status ⇒ Object
Returns the value of attribute exit_status.
-
#filename ⇒ Object
Returns the value of attribute filename.
-
#invocation_chain ⇒ Object
Returns the value of attribute invocation_chain.
-
#job ⇒ Hash
Options used for executing capistrano task.
-
#job_id ⇒ Object
Returns the value of attribute job_id.
-
#job_termination_condition ⇒ Object
Returns the value of attribute job_termination_condition.
-
#machine ⇒ Object
Returns the value of attribute machine.
-
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
The instance of the manager that delegated the job to this worker.
-
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
-
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
-
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
-
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
-
#task_argv ⇒ Object
Returns the value of attribute task_argv.
-
#worker_log ⇒ Object
Returns the value of attribute worker_log.
-
#worker_state ⇒ Object
Returns the value of attribute worker_state.
Instance Method Summary collapse
- #check_child_proces ⇒ Object
- #check_gitflow ⇒ Object
- #execute_after_succesfull_subscription ⇒ Object
- #execute_deploy ⇒ Object
- #executed_task?(task) ⇒ Boolean
- #finish_worker(exit_status) ⇒ Object
- #handle_subscription(message) ⇒ Object
- #notify_finished(exit_status) ⇒ Object
- #on_close(code, reason) ⇒ Object
- #on_message(message) ⇒ Object
- #publish_rake_event(data) ⇒ Object
- #rake_actor_id(_data) ⇒ Object
- #save_tasks_to_be_executed(message) ⇒ Object
- #send_msg(channel, message = nil) ⇒ Object
- #start_task ⇒ Object
- #task_approval(message) ⇒ Object
- #update_machine_state(name) ⇒ Object
- #work(job, manager) ⇒ Object
Methods included from BaseActorHelper
Instance Attribute Details
#action_name ⇒ Object
Returns the value of attribute action_name.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def action_name @action_name end |
#app_name ⇒ Object
Returns the value of attribute app_name.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def app_name @app_name end |
#client ⇒ Object
Returns the value of attribute client.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def client @client end |
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def current_task_number @current_task_number end |
#env_name ⇒ Object
Returns the value of attribute env_name.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def env_name @env_name end |
#env_options ⇒ Object
Returns the value of attribute env_options.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def end |
#exit_status ⇒ Object
Returns the value of attribute exit_status.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def exit_status @exit_status end |
#filename ⇒ Object
Returns the value of attribute filename.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def filename @filename end |
#invocation_chain ⇒ Object
Returns the value of attribute invocation_chain.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def invocation_chain @invocation_chain end |
#job ⇒ Hash
Returns options used for executing capistrano task.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include CapistranoMulticonfigParallel::BaseActorHelper class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :worker_state, :invocation_chain, :filename, :worker_log, :exit_status def work(job, manager) @job = job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect}") @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end def worker_state if Actor.current.alive? @machine.state.to_s.green else job.status = 'dead' job.status.upcase.red end end def start_task log_to_file("exec worker #{@job_id} starts task") @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(_data) "rake_worker_#{@job_id}" end def () log_to_file("worker #{@job_id} received: #{message.inspect}") if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end def rake_tasks @rake_tasks ||= [] end def invocation_chain @invocation_chain ||= [] end def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.command.to_s log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true) end def check_child_proces @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process @child_process end def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) @manager.dispatch_new_job(@job, stage: 'production') end def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) else log_to_file(, @job_id) end end def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end def task_approval() job_conditions = @manager.job_to_condition[@job_id] if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end def update_machine_state(name) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") @machine.go_to_transition(name.to_s) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) if job.failed? # force worker to rollback end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive? update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if @manager.present? && @manager.alive? && @manager.all_workers_finished? end def notify_finished(exit_status) finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) end end |
#job_id ⇒ Object
Returns the value of attribute job_id.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def job_id @job_id end |
#job_termination_condition ⇒ Object
Returns the value of attribute job_termination_condition.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def job_termination_condition @job_termination_condition end |
#machine ⇒ Object
Returns the value of attribute machine.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def machine @machine end |
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
Returns the instance of the manager that delegated the job to this worker.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include CapistranoMulticonfigParallel::BaseActorHelper class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :worker_state, :invocation_chain, :filename, :worker_log, :exit_status def work(job, manager) @job = job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect}") @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end def worker_state if Actor.current.alive? @machine.state.to_s.green else job.status = 'dead' job.status.upcase.red end end def start_task log_to_file("exec worker #{@job_id} starts task") @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(_data) "rake_worker_#{@job_id}" end def () log_to_file("worker #{@job_id} received: #{message.inspect}") if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end def rake_tasks @rake_tasks ||= [] end def invocation_chain @invocation_chain ||= [] end def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.command.to_s log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true) end def check_child_proces @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process @child_process end def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) @manager.dispatch_new_job(@job, stage: 'production') end def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) else log_to_file(, @job_id) end end def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end def task_approval() job_conditions = @manager.job_to_condition[@job_id] if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end def update_machine_state(name) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") @machine.go_to_transition(name.to_s) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) if job.failed? # force worker to rollback end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive? update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if @manager.present? && @manager.alive? && @manager.all_workers_finished? end def notify_finished(exit_status) finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) end end |
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def publisher_channel @publisher_channel end |
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def rake_tasks @rake_tasks end |
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def subscription_channel @subscription_channel end |
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def successfull_subscription @successfull_subscription end |
#task_argv ⇒ Object
Returns the value of attribute task_argv.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def task_argv @task_argv end |
#worker_log ⇒ Object
Returns the value of attribute worker_log.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def worker_log @worker_log end |
#worker_state ⇒ Object
Returns the value of attribute worker_state.
23 24 25 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 23 def worker_state @worker_state end |
Instance Method Details
#check_child_proces ⇒ Object
94 95 96 97 98 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 94 def check_child_proces @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process @child_process end |
#check_gitflow ⇒ Object
104 105 106 107 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 104 def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) @manager.dispatch_new_job(@job, stage: 'production') end |
#execute_after_succesfull_subscription ⇒ Object
73 74 75 76 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 73 def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end |
#execute_deploy ⇒ Object
86 87 88 89 90 91 92 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 86 def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.command.to_s log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true) end |
#executed_task?(task) ⇒ Boolean
124 125 126 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 124 def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end |
#finish_worker(exit_status) ⇒ Object
156 157 158 159 160 161 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 156 def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive? update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if @manager.present? && @manager.alive? && @manager.all_workers_finished? end |
#handle_subscription(message) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 109 def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) else log_to_file(, @job_id) end end |
#notify_finished(exit_status) ⇒ Object
163 164 165 166 167 168 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 163 def notify_finished(exit_status) finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) end |
#on_close(code, reason) ⇒ Object
100 101 102 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 100 def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end |
#on_message(message) ⇒ Object
63 64 65 66 67 68 69 70 71 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 63 def () log_to_file("worker #{@job_id} received: #{message.inspect}") if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end |
#publish_rake_event(data) ⇒ Object
55 56 57 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 55 def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end |
#rake_actor_id(_data) ⇒ Object
59 60 61 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 59 def rake_actor_id(_data) "rake_worker_#{@job_id}" end |
#save_tasks_to_be_executed(message) ⇒ Object
139 140 141 142 143 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 139 def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end |
#send_msg(channel, message = nil) ⇒ Object
152 153 154 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 152 def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end |
#start_task ⇒ Object
50 51 52 53 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 50 def start_task log_to_file("exec worker #{@job_id} starts task") @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end |
#task_approval(message) ⇒ Object
128 129 130 131 132 133 134 135 136 137 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 128 def task_approval() job_conditions = @manager.job_to_condition[@job_id] if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end |
#update_machine_state(name) ⇒ Object
145 146 147 148 149 150 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 145 def update_machine_state(name) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") @machine.go_to_transition(name.to_s) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) if job.failed? # force worker to rollback end |
#work(job, manager) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 28 def work(job, manager) @job = job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect}") @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end |