Class: CapistranoMulticonfigParallel::CelluloidWorker
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidWorker
- Includes:
- ApplicationHelper, Celluloid, Celluloid::Logger, Celluloid::Notifications
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb
Overview
worker that will spawn a child process in order to execute a capistrano job and monitor that process
Defined Under Namespace
Classes: TaskFailed
Instance Attribute Summary collapse
-
#action_name ⇒ Object
Returns the value of attribute action_name.
-
#app_name ⇒ Object
Returns the value of attribute app_name.
-
#client ⇒ Object
Returns the value of attribute client.
-
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
-
#env_name ⇒ Object
Returns the value of attribute env_name.
-
#env_options ⇒ Object
Returns the value of attribute env_options.
-
#exit_status ⇒ Object
Returns the value of attribute exit_status.
-
#filename ⇒ Object
Returns the value of attribute filename.
-
#invocation_chain ⇒ Object
Returns the value of attribute invocation_chain.
-
#job ⇒ Hash
Options used for executing capistrano task.
-
#job_id ⇒ Object
Returns the value of attribute job_id.
-
#job_termination_condition ⇒ Object
Returns the value of attribute job_termination_condition.
-
#machine ⇒ Object
Returns the value of attribute machine.
-
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
The instance of the manager that delegated the job to this worker.
-
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
-
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
-
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
-
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
-
#task_argv ⇒ Object
Returns the value of attribute task_argv.
-
#worker_log ⇒ Object
Returns the value of attribute worker_log.
-
#worker_state ⇒ Object
Returns the value of attribute worker_state.
Instance Method Summary collapse
- #check_child_proces ⇒ Object
- #check_gitflow ⇒ Object
- #execute_after_succesfull_subscription ⇒ Object
- #execute_deploy ⇒ Object
- #executed_task?(task) ⇒ Boolean
- #finish_worker(exit_status) ⇒ Object
- #handle_subscription(message) ⇒ Object
- #message_is_about_a_task?(message) ⇒ Boolean
- #message_is_for_stdout?(message) ⇒ Boolean
- #notify_finished(exit_status) ⇒ Object
- #on_close(code, reason) ⇒ Object
- #on_message(message) ⇒ Object
- #publish_rake_event(data) ⇒ Object
- #rake_actor_id(_data) ⇒ Object
- #save_tasks_to_be_executed(message) ⇒ Object
- #send_msg(channel, message = nil) ⇒ Object
- #start_task ⇒ Object
- #task_approval(message) ⇒ Object
- #update_machine_state(name) ⇒ Object
- #work(job, manager) ⇒ Object
Methods included from ApplicationHelper
action_confirmed?, fetch_parsed_string, fetch_remaining_arguments, find_remaining_args, get_question_details, msg_for_stdin?, msg_for_task?, parse_task_string, percent_of, regex_last_match, setup_command_line_standard, wrap_string
Methods included from CapistranoHelper
env_key_format, env_prefix, filtered_env_keys_format, setup_flags_for_job, trace_flag
Methods included from GemHelper
fetch_gem_version, find_loaded_gem, find_loaded_gem_property, get_parsed_version, verify_gem_version
Methods included from StagesHelper
app_names_from_stages, check_stage_path, checks_paths, fetch_stages_app, fetch_stages_from_file, fetch_stages_paths, independent_deploy?, multi_apps?, sorted_paths, stages, stages_paths, stages_root
Methods included from ParseHelper
check_hash_set, check_numeric, strip_characters_from_string, value_is_array?, verify_array_of_strings, verify_empty_options, warn_array_without_strings
Methods included from CoreHelper
app_debug_enabled?, ask_confirm, ask_stdout_confirmation, check_terminal_tty, debug_websocket?, error_filtered?, execute_with_rescue, find_worker_log, force_confirmation, format_error, log_error, log_output_error, log_to_file, multi_fetch_argv, print_to_log_file, rescue_error, rescue_interrupt, setup_filename_logger, setup_logger_formatter, show_warning, terminal_actor, terminal_errors?, websocket_config, websocket_server_config
Methods included from InternalHelper
arg_is_in_default_config?, check_file, create_log_file, custom_commands, default_config_keys, default_internal_config, default_internal_configuration_params, detect_root, enable_main_log_file, fail_capfile_not_found, fetch_default_internal_config, find_config_type, find_env_multi_cap_root, find_file_in_directory, internal_config_directory, internal_config_file, log_directory, main_log_file, multi_level_prop, pathname_is_root?, pwd_directory, pwd_parent_dir, root, setup_default_configuration_types, sliced_default_config, try_detect_file
Instance Attribute Details
#action_name ⇒ Object
Returns the value of attribute action_name.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def action_name @action_name end |
#app_name ⇒ Object
Returns the value of attribute app_name.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def app_name @app_name end |
#client ⇒ Object
Returns the value of attribute client.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def client @client end |
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def current_task_number @current_task_number end |
#env_name ⇒ Object
Returns the value of attribute env_name.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def env_name @env_name end |
#env_options ⇒ Object
Returns the value of attribute env_options.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def @env_options end |
#exit_status ⇒ Object
Returns the value of attribute exit_status.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def exit_status @exit_status end |
#filename ⇒ Object
Returns the value of attribute filename.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def filename @filename end |
#invocation_chain ⇒ Object
Returns the value of attribute invocation_chain.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def invocation_chain @invocation_chain end |
#job ⇒ Hash
Returns options used for executing capistrano task.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger include CapistranoMulticonfigParallel::ApplicationHelper class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :worker_state, :invocation_chain, :filename, :worker_log, :exit_status def work(job, manager) @job = job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect}") @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end def worker_state if Actor.current.alive? @machine.state.to_s.green else job.status = 'dead' job.status.upcase.red end end def start_task log_to_file("exec worker #{@job_id} starts task") @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(_data) "rake_worker_#{@job_id}" end def () log_to_file("worker #{@job_id} received: #{.inspect}") if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end def rake_tasks @rake_tasks ||= [] end def invocation_chain @invocation_chain ||= [] end def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.command.to_s log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true) end def check_child_proces @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process @child_process end def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) @manager.dispatch_new_job(@job, stage: 'production') end def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) else log_to_file(, @job_id) end end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['action'] == 'stdout' end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end def task_approval() job_conditions = @manager.job_to_condition[@job_id] if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end def update_machine_state(name) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") @machine.go_to_transition(name.to_s) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) if job.failed? # force worker to rollback end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if @manager.alive? && @manager.all_workers_finished? end def notify_finished(exit_status) finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) end end |
#job_id ⇒ Object
Returns the value of attribute job_id.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def job_id @job_id end |
#job_termination_condition ⇒ Object
Returns the value of attribute job_termination_condition.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def job_termination_condition @job_termination_condition end |
#machine ⇒ Object
Returns the value of attribute machine.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def machine @machine end |
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
Returns the instance of the manager that delegated the job to this worker.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger include CapistranoMulticonfigParallel::ApplicationHelper class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :worker_state, :invocation_chain, :filename, :worker_log, :exit_status def work(job, manager) @job = job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect}") @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end def worker_state if Actor.current.alive? @machine.state.to_s.green else job.status = 'dead' job.status.upcase.red end end def start_task log_to_file("exec worker #{@job_id} starts task") @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(_data) "rake_worker_#{@job_id}" end def () log_to_file("worker #{@job_id} received: #{.inspect}") if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end def rake_tasks @rake_tasks ||= [] end def invocation_chain @invocation_chain ||= [] end def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.command.to_s log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true) end def check_child_proces @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process @child_process end def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) @manager.dispatch_new_job(@job, stage: 'production') end def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) else log_to_file(, @job_id) end end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['action'] == 'stdout' end def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end def task_approval() job_conditions = @manager.job_to_condition[@job_id] if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end def update_machine_state(name) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") @machine.go_to_transition(name.to_s) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) if job.failed? # force worker to rollback end def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if @manager.alive? && @manager.all_workers_finished? end def notify_finished(exit_status) finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) end end |
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def publisher_channel @publisher_channel end |
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def rake_tasks @rake_tasks end |
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def subscription_channel @subscription_channel end |
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def successfull_subscription @successfull_subscription end |
#task_argv ⇒ Object
Returns the value of attribute task_argv.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def task_argv @task_argv end |
#worker_log ⇒ Object
Returns the value of attribute worker_log.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def worker_log @worker_log end |
#worker_state ⇒ Object
Returns the value of attribute worker_state.
26 27 28 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 26 def worker_state @worker_state end |
Instance Method Details
#check_child_proces ⇒ Object
97 98 99 100 101 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 97 def check_child_proces @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process @child_process end |
#check_gitflow ⇒ Object
107 108 109 110 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 107 def check_gitflow return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK) @manager.dispatch_new_job(@job, stage: 'production') end |
#execute_after_succesfull_subscription ⇒ Object
76 77 78 79 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 76 def execute_after_succesfull_subscription async.execute_deploy @manager.async.wait_task_confirmations_worker(@job) end |
#execute_deploy ⇒ Object
89 90 91 92 93 94 95 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 89 def execute_deploy log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") check_child_proces command = job.command.to_s log_to_file("worker #{@job_id} executes: #{command}") @child_process.async.work(@job, command, actor: Actor.current, silent: true) end |
#executed_task?(task) ⇒ Boolean
135 136 137 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 135 def executed_task?(task) rake_tasks.present? && rake_tasks.index(task.to_s).present? end |
#finish_worker(exit_status) ⇒ Object
167 168 169 170 171 172 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 167 def finish_worker(exit_status) log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}") @manager.mark_completed_remaining_tasks(@job) update_machine_state('FINISHED') if exit_status == 0 @manager.workers_terminated.signal('completed') if @manager.alive? && @manager.all_workers_finished? end |
#handle_subscription(message) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 112 def handle_subscription() if () check_gitflow save_tasks_to_be_executed() async.update_machine_state(['task']) # if message['action'] == 'invoke' log_to_file("worker #{@job_id} state is #{@machine.state}") task_approval() elsif () result = Celluloid::Actor[:terminal_server].show_confirmation(['question'], ['default']) publish_rake_event(.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin')) else log_to_file(, @job_id) end end |
#message_is_about_a_task?(message) ⇒ Boolean
131 132 133 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 131 def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end |
#message_is_for_stdout?(message) ⇒ Boolean
127 128 129 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 127 def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['action'] == 'stdout' end |
#notify_finished(exit_status) ⇒ Object
174 175 176 177 178 179 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 174 def notify_finished(exit_status) finish_worker(exit_status) return if exit_status == 0 = "worker #{@job_id} task failed with exit status #{exit_status.inspect} " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) end |
#on_close(code, reason) ⇒ Object
103 104 105 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 103 def on_close(code, reason) log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") end |
#on_message(message) ⇒ Object
66 67 68 69 70 71 72 73 74 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 66 def () log_to_file("worker #{@job_id} received: #{.inspect}") if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end |
#publish_rake_event(data) ⇒ Object
58 59 60 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 58 def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end |
#rake_actor_id(_data) ⇒ Object
62 63 64 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 62 def rake_actor_id(_data) "rake_worker_#{@job_id}" end |
#save_tasks_to_be_executed(message) ⇒ Object
150 151 152 153 154 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 150 def save_tasks_to_be_executed() log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}") rake_tasks << ['task'] if rake_tasks.last != ['task'] invocation_chain << ['task'] if invocation_chain.last != ['task'] end |
#send_msg(channel, message = nil) ⇒ Object
163 164 165 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 163 def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end |
#start_task ⇒ Object
53 54 55 56 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 53 def start_task log_to_file("exec worker #{@job_id} starts task") @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end |
#task_approval(message) ⇒ Object
139 140 141 142 143 144 145 146 147 148 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 139 def task_approval() job_conditions = @manager.job_to_condition[@job_id] if job_conditions.present? && configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' task_confirmation = job_conditions[['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end |
#update_machine_state(name) ⇒ Object
156 157 158 159 160 161 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 156 def update_machine_state(name) log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") @machine.go_to_transition(name.to_s) = "worker #{@job_id} task #{name} failed " raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed.new(), ) if job.failed? # force worker to rollback end |
#work(job, manager) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 31 def work(job, manager) @job = job @job_id = job.id @worker_state = job.status @manager = manager @job_confirmation_conditions = [] log_to_file("worker #{@job_id} received #{job.inspect}") @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current) @manager.setup_worker_conditions(@job) manager.register_worker_for_job(job, Actor.current) end |