Class: CapistranoMulticonfigParallel::CelluloidWorker
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::CelluloidWorker
- Includes:
- Celluloid, Celluloid::Logger, Celluloid::Notifications
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb
Overview
rubocop:disable ClassLength worker that will spawn a child process in order to execute a capistrano job and monitor that process
Defined Under Namespace
Classes: TaskFailed
Instance Attribute Summary collapse
-
#action_name ⇒ Object
Returns the value of attribute action_name.
-
#app_name ⇒ Object
Returns the value of attribute app_name.
-
#client ⇒ Object
Returns the value of attribute client.
-
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
-
#env_name ⇒ Object
Returns the value of attribute env_name.
-
#env_options ⇒ Object
Returns the value of attribute env_options.
-
#execute_deploy ⇒ Object
Returns the value of attribute execute_deploy.
-
#executed_dry_run ⇒ Object
Returns the value of attribute executed_dry_run.
-
#job ⇒ Hash
Options used for executing capistrano task.
-
#job_id ⇒ Object
Returns the value of attribute job_id.
-
#job_termination_condition ⇒ Object
Returns the value of attribute job_termination_condition.
-
#machine ⇒ Object
Returns the value of attribute machine.
-
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
The instance of the manager that delegated the job to this worker.
-
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
-
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
-
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
-
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
-
#task_argv ⇒ Object
Returns the value of attribute task_argv.
-
#worker_state ⇒ Object
Returns the value of attribute worker_state.
Instance Method Summary collapse
- #cd_working_directory ⇒ Object
- #check_child_proces ⇒ Object
- #check_gitflow ⇒ Object
- #crashed? ⇒ Boolean
- #debug_enabled? ⇒ Boolean
- #execute_after_succesfull_subscription ⇒ Object
- #executed_task?(task) ⇒ Boolean
- #finish_worker ⇒ Object
- #generate_command ⇒ Object
- #handle_subscription(message) ⇒ Object
- #message_is_about_a_task?(message) ⇒ Boolean
- #notify_finished(exit_status) ⇒ Object
- #on_close(code, reason) ⇒ Object
- #on_message(message) ⇒ Object
- #process_job(job) ⇒ Object
- #publish_rake_event(data) ⇒ Object
- #rake_actor_id(data) ⇒ Object
- #save_tasks_to_be_executed(message) ⇒ Object
- #send_msg(channel, message = nil) ⇒ Object
- #setup_command_line(*options) ⇒ Object
- #setup_task_arguments ⇒ Object
- #start_task ⇒ Object
- #task_approval(message) ⇒ Object
- #update_machine_state(name) ⇒ Object
- #work(job, manager) ⇒ Object
Instance Attribute Details
#action_name ⇒ Object
Returns the value of attribute action_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def action_name @action_name end |
#app_name ⇒ Object
Returns the value of attribute app_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def app_name @app_name end |
#client ⇒ Object
Returns the value of attribute client.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def client @client end |
#current_task_number ⇒ Object
Returns the value of attribute current_task_number.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def current_task_number @current_task_number end |
#env_name ⇒ Object
Returns the value of attribute env_name.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def env_name @env_name end |
#env_options ⇒ Object
Returns the value of attribute env_options.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def end |
#execute_deploy ⇒ Object
Returns the value of attribute execute_deploy.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def execute_deploy @execute_deploy end |
#executed_dry_run ⇒ Object
Returns the value of attribute executed_dry_run.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def executed_dry_run @executed_dry_run end |
#job ⇒ Hash
Returns options used for executing capistrano task.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :worker_state def work(job, manager) @job = job @worker_state = 'started' @manager = manager @job_confirmation_conditions = [] process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) manager.register_worker_for_job(job, Actor.current) end def debug_enabled? @manager.class.debug_enabled? end def start_task @manager.setup_worker_conditions(Actor.current) debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: #{generate_command}") if debug_enabled? @child_process.async.work(generate_command, actor: Actor.current, silent: true, dry_run: true) else async.execute_deploy end end def rake_tasks @rake_tasks ||= [] end def cd_working_directory "cd #{CapistranoMulticonfigParallel.detect_root}" end def generate_command " \#{cd_working_directory} && RAILS_ENV=\#{@env_name} bundle exec multi_cap \#{@task_argv.join(' ')}\n CMD\n end\n\n def execute_deploy\n @execute_deploy = true\n debug(\"invocation chain \#{@job_id} is : \#{@rake_tasks.inspect}\") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress\n check_child_proces\n setup_task_arguments\n debug(\"worker \#{@job_id} executes: \#{generate_command}\") if debug_enabled?\n @child_process.async.work(generate_command, actor: Actor.current, silent: true)\n @manager.wait_task_confirmations_worker(Actor.current)\n end\n\n def check_child_proces\n if !defined?(@child_process) || @child_process.nil?\n @child_process = CapistranoMulticonfigParallel::ChildProcess.new\n Actor.current.link @child_process\n else\n @client.unsubscribe(\"rake_worker_\#{@job_id}_count\")\n @child_process.exit_status = nil\n end\n end\n\n def on_close(code, reason)\n debug(\"worker \#{@job_id} websocket connection closed: \#{code.inspect}, \#{reason.inspect}\") if debug_enabled?\n end\n\n def check_gitflow\n return if !@env_name == 'staging' || [email protected]_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_VERIFY_UPTODATE_TASK)\n @manager.dispatch_new_job(@job.merge('env' => 'production'))\n end\n\n def handle_subscription(message)\n if message_is_about_a_task?(message)\n check_gitflow\n save_tasks_to_be_executed(message)\n update_machine_state(message['task']) # if message['action'] == 'invoke'\n debug(\"worker \#{@job_id} state is \#{@machine.state}\") if debug_enabled?\n task_approval(message)\n else\n debug(\"worker \#{@job_id} could not handle \#{message}\") if debug_enabled?\n end\n end\n\n def message_is_about_a_task?(message)\n message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?\n end\n\n def executed_task?(task)\n @rake_tasks.present? && @rake_tasks[task].present?\n end\n\n def task_approval(message)\n if @manager.apply_confirmations? && CapistranoMulticonfigParallel.configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'\n task_confirmation = @manager.job_to_condition[@job_id][message['task']]\n task_confirmation[:status] = 'confirmed'\n task_confirmation[:condition].signal(message['task'])\n else\n publish_rake_event(message.merge('approved' => 'yes'))\n end\n end\n\n def save_tasks_to_be_executed(message)\n return unless message['action'] == 'count'\n debug(\"worler \#{@job_id} current invocation chain : \#{@rake_tasks.inspect}\") if debug_enabled?\n @rake_tasks = [] if @rake_tasks.blank?\n @rake_tasks << message['task'] if @rake_tasks.last != message['task']\n end\n\n def update_machine_state(name)\n debug(\"worker \#{@job_id} triest to transition from \#{@machine.state} to \#{name}\") if debug_enabled?\n @machine.transitions.on(name.to_s, @machine.state => name.to_s)\n @machine.go_to_transition(name.to_s)\n raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, \"task \#{@action} failed \") if name == 'deploy:failed' # force worker to rollback\n end\n\n def setup_command_line(*options)\n @task_argv = []\n options.each do |option|\n @task_argv << option\n end\n @task_argv\n end\n\n def setup_task_arguments\n # stage = \"\#{@app_name}:\#{@env_name} \#{@action_name}\"\n stage = @app_name.present? ? \"\#{@app_name}:\#{@env_name}\" : \"\#{@env_name}\"\n array_options = [\"\#{stage}\"]\n array_options << \"\#{@action_name}[\#{@task_arguments.join(',')}]\"\n @env_options.each do |key, value|\n array_options << \"\#{key}=\#{value}\" if value.present?\n end\n array_options << '--trace' if debug_enabled?\n setup_command_line(*array_options)\n end\n\n def send_msg(channel, message = nil)\n publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }\n end\n\n def process_job(job)\n processed_job = @manager.process_job(job)\n @job_id = processed_job['job_id']\n @app_name = processed_job['app_name']\n @env_name = processed_job['env_name']\n @action_name = processed_job['action_name']\n @env_options = processed_job['env_options']\n @task_arguments = processed_job['task_arguments']\n end\n\n def crashed?\n @action_name == 'deploy:rollback' || @action_name == 'deploy:failed' || @manager.job_failed?(@job)\n end\n\n def finish_worker\n @manager.mark_completed_remaining_tasks(Actor.current)\n @worker_state = 'finished'\n @manager.job_to_worker.each do|_job_id, worker|\n debug(\"worker \#{worker.job_id}has state \#{worker.worker_state}\") if worker.alive? && debug_enabled?\n end\n end\n\n def notify_finished(exit_status)\n return unless @execute_deploy\n if exit_status.exitstatus != 0\n debug(\"worker \#{job_id} tries to terminate\") if debug_enabled?\n terminate\n else\n update_machine_state('FINISHED')\n debug(\"worker \#{job_id} notifies manager has finished\") if debug_enabled?\n finish_worker\n end\n end\nend\n" |
#job_id ⇒ Object
Returns the value of attribute job_id.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def job_id @job_id end |
#job_termination_condition ⇒ Object
Returns the value of attribute job_termination_condition.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def job_termination_condition @job_termination_condition end |
#machine ⇒ Object
Returns the value of attribute machine.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def machine @machine end |
#manager ⇒ CapistranoMulticonfigParallel::CelluloidManager
Returns the instance of the manager that delegated the job to this worker.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19 class CelluloidWorker include Celluloid include Celluloid::Notifications include Celluloid::Logger class TaskFailed < StandardError; end attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run, :rake_tasks, :current_task_number, # tracking tasks :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events :job_termination_condition, :worker_state def work(job, manager) @job = job @worker_state = 'started' @manager = manager @job_confirmation_conditions = [] process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) manager.register_worker_for_job(job, Actor.current) end def debug_enabled? @manager.class.debug_enabled? end def start_task @manager.setup_worker_conditions(Actor.current) debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: #{generate_command}") if debug_enabled? @child_process.async.work(generate_command, actor: Actor.current, silent: true, dry_run: true) else async.execute_deploy end end def rake_tasks @rake_tasks ||= [] end def cd_working_directory "cd #{CapistranoMulticonfigParallel.detect_root}" end def generate_command " \#{cd_working_directory} && RAILS_ENV=\#{@env_name} bundle exec multi_cap \#{@task_argv.join(' ')}\n CMD\n end\n\n def execute_deploy\n @execute_deploy = true\n debug(\"invocation chain \#{@job_id} is : \#{@rake_tasks.inspect}\") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress\n check_child_proces\n setup_task_arguments\n debug(\"worker \#{@job_id} executes: \#{generate_command}\") if debug_enabled?\n @child_process.async.work(generate_command, actor: Actor.current, silent: true)\n @manager.wait_task_confirmations_worker(Actor.current)\n end\n\n def check_child_proces\n if !defined?(@child_process) || @child_process.nil?\n @child_process = CapistranoMulticonfigParallel::ChildProcess.new\n Actor.current.link @child_process\n else\n @client.unsubscribe(\"rake_worker_\#{@job_id}_count\")\n @child_process.exit_status = nil\n end\n end\n\n def on_close(code, reason)\n debug(\"worker \#{@job_id} websocket connection closed: \#{code.inspect}, \#{reason.inspect}\") if debug_enabled?\n end\n\n def check_gitflow\n return if !@env_name == 'staging' || [email protected]_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_VERIFY_UPTODATE_TASK)\n @manager.dispatch_new_job(@job.merge('env' => 'production'))\n end\n\n def handle_subscription(message)\n if message_is_about_a_task?(message)\n check_gitflow\n save_tasks_to_be_executed(message)\n update_machine_state(message['task']) # if message['action'] == 'invoke'\n debug(\"worker \#{@job_id} state is \#{@machine.state}\") if debug_enabled?\n task_approval(message)\n else\n debug(\"worker \#{@job_id} could not handle \#{message}\") if debug_enabled?\n end\n end\n\n def message_is_about_a_task?(message)\n message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?\n end\n\n def executed_task?(task)\n @rake_tasks.present? && @rake_tasks[task].present?\n end\n\n def task_approval(message)\n if @manager.apply_confirmations? && CapistranoMulticonfigParallel.configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'\n task_confirmation = @manager.job_to_condition[@job_id][message['task']]\n task_confirmation[:status] = 'confirmed'\n task_confirmation[:condition].signal(message['task'])\n else\n publish_rake_event(message.merge('approved' => 'yes'))\n end\n end\n\n def save_tasks_to_be_executed(message)\n return unless message['action'] == 'count'\n debug(\"worler \#{@job_id} current invocation chain : \#{@rake_tasks.inspect}\") if debug_enabled?\n @rake_tasks = [] if @rake_tasks.blank?\n @rake_tasks << message['task'] if @rake_tasks.last != message['task']\n end\n\n def update_machine_state(name)\n debug(\"worker \#{@job_id} triest to transition from \#{@machine.state} to \#{name}\") if debug_enabled?\n @machine.transitions.on(name.to_s, @machine.state => name.to_s)\n @machine.go_to_transition(name.to_s)\n raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, \"task \#{@action} failed \") if name == 'deploy:failed' # force worker to rollback\n end\n\n def setup_command_line(*options)\n @task_argv = []\n options.each do |option|\n @task_argv << option\n end\n @task_argv\n end\n\n def setup_task_arguments\n # stage = \"\#{@app_name}:\#{@env_name} \#{@action_name}\"\n stage = @app_name.present? ? \"\#{@app_name}:\#{@env_name}\" : \"\#{@env_name}\"\n array_options = [\"\#{stage}\"]\n array_options << \"\#{@action_name}[\#{@task_arguments.join(',')}]\"\n @env_options.each do |key, value|\n array_options << \"\#{key}=\#{value}\" if value.present?\n end\n array_options << '--trace' if debug_enabled?\n setup_command_line(*array_options)\n end\n\n def send_msg(channel, message = nil)\n publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }\n end\n\n def process_job(job)\n processed_job = @manager.process_job(job)\n @job_id = processed_job['job_id']\n @app_name = processed_job['app_name']\n @env_name = processed_job['env_name']\n @action_name = processed_job['action_name']\n @env_options = processed_job['env_options']\n @task_arguments = processed_job['task_arguments']\n end\n\n def crashed?\n @action_name == 'deploy:rollback' || @action_name == 'deploy:failed' || @manager.job_failed?(@job)\n end\n\n def finish_worker\n @manager.mark_completed_remaining_tasks(Actor.current)\n @worker_state = 'finished'\n @manager.job_to_worker.each do|_job_id, worker|\n debug(\"worker \#{worker.job_id}has state \#{worker.worker_state}\") if worker.alive? && debug_enabled?\n end\n end\n\n def notify_finished(exit_status)\n return unless @execute_deploy\n if exit_status.exitstatus != 0\n debug(\"worker \#{job_id} tries to terminate\") if debug_enabled?\n terminate\n else\n update_machine_state('FINISHED')\n debug(\"worker \#{job_id} notifies manager has finished\") if debug_enabled?\n finish_worker\n end\n end\nend\n" |
#publisher_channel ⇒ Object
Returns the value of attribute publisher_channel.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def publisher_channel @publisher_channel end |
#rake_tasks ⇒ Object
Returns the value of attribute rake_tasks.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def rake_tasks @rake_tasks end |
#subscription_channel ⇒ Object
Returns the value of attribute subscription_channel.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def subscription_channel @subscription_channel end |
#successfull_subscription ⇒ Object
Returns the value of attribute successfull_subscription.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def successfull_subscription @successfull_subscription end |
#task_argv ⇒ Object
Returns the value of attribute task_argv.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def task_argv @task_argv end |
#worker_state ⇒ Object
Returns the value of attribute worker_state.
25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25 def worker_state @worker_state end |
Instance Method Details
#cd_working_directory ⇒ Object
92 93 94 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 92 def cd_working_directory "cd #{CapistranoMulticonfigParallel.detect_root}" end |
#check_child_proces ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 112 def check_child_proces if !defined?(@child_process) || @child_process.nil? @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process else @client.unsubscribe("rake_worker_#{@job_id}_count") @child_process.exit_status = nil end end |
#check_gitflow ⇒ Object
126 127 128 129 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 126 def check_gitflow return if !@env_name == 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_VERIFY_UPTODATE_TASK) @manager.dispatch_new_job(@job.merge('env' => 'production')) end |
#crashed? ⇒ Boolean
209 210 211 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 209 def crashed? @action_name == 'deploy:rollback' || @action_name == 'deploy:failed' || @manager.job_failed?(@job) end |
#debug_enabled? ⇒ Boolean
42 43 44 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 42 def debug_enabled? @manager.class.debug_enabled? end |
#execute_after_succesfull_subscription ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 72 def execute_after_succesfull_subscription setup_task_arguments if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress @executed_dry_run = true @rake_tasks = [] @task_argv << '--dry-run' @task_argv << 'count_rake=true' @child_process = CapistranoMulticonfigParallel::ChildProcess.new Actor.current.link @child_process debug("worker #{@job_id} executes: #{generate_command}") if debug_enabled? @child_process.async.work(generate_command, actor: Actor.current, silent: true, dry_run: true) else async.execute_deploy end end |
#executed_task?(task) ⇒ Boolean
147 148 149 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 147 def executed_task?(task) @rake_tasks.present? && @rake_tasks[task].present? end |
#finish_worker ⇒ Object
213 214 215 216 217 218 219 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 213 def finish_worker @manager.mark_completed_remaining_tasks(Actor.current) @worker_state = 'finished' @manager.job_to_worker.each do|_job_id, worker| debug("worker #{worker.job_id}has state #{worker.worker_state}") if worker.alive? && debug_enabled? end end |
#generate_command ⇒ Object
96 97 98 99 100 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 96 def generate_command " \#{cd_working_directory} && RAILS_ENV=\#{@env_name} bundle exec multi_cap \#{@task_argv.join(' ')}\n CMD\nend\n" |
#handle_subscription(message) ⇒ Object
131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 131 def handle_subscription() if () check_gitflow save_tasks_to_be_executed() update_machine_state(['task']) # if message['action'] == 'invoke' debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled? task_approval() else debug("worker #{@job_id} could not handle #{message}") if debug_enabled? end end |
#message_is_about_a_task?(message) ⇒ Boolean
143 144 145 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 143 def () .present? && .is_a?(Hash) && ['action'].present? && ['job_id'].present? && ['task'].present? end |
#notify_finished(exit_status) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 221 def notify_finished(exit_status) return unless @execute_deploy if exit_status.exitstatus != 0 debug("worker #{job_id} tries to terminate") if debug_enabled? terminate else update_machine_state('FINISHED') debug("worker #{job_id} notifies manager has finished") if debug_enabled? finish_worker end end |
#on_close(code, reason) ⇒ Object
122 123 124 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 122 def on_close(code, reason) debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled? end |
#on_message(message) ⇒ Object
62 63 64 65 66 67 68 69 70 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 62 def () debug("worker #{@job_id} received: #{message.inspect}") if debug_enabled? if @client.succesfull_subscription?() @successfull_subscription = true execute_after_succesfull_subscription else handle_subscription() end end |
#process_job(job) ⇒ Object
199 200 201 202 203 204 205 206 207 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 199 def process_job(job) processed_job = @manager.process_job(job) @job_id = processed_job['job_id'] @app_name = processed_job['app_name'] @env_name = processed_job['env_name'] @action_name = processed_job['action_name'] = processed_job['env_options'] @task_arguments = processed_job['task_arguments'] end |
#publish_rake_event(data) ⇒ Object
54 55 56 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 54 def publish_rake_event(data) @client.publish(rake_actor_id(data), data) end |
#rake_actor_id(data) ⇒ Object
58 59 60 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 58 def rake_actor_id(data) data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}" end |
#save_tasks_to_be_executed(message) ⇒ Object
161 162 163 164 165 166 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 161 def save_tasks_to_be_executed() return unless ['action'] == 'count' debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled? @rake_tasks = [] if @rake_tasks.blank? @rake_tasks << ['task'] if @rake_tasks.last != ['task'] end |
#send_msg(channel, message = nil) ⇒ Object
195 196 197 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 195 def send_msg(channel, = nil) publish channel, .present? && .is_a?(Hash) ? { job_id: @job_id }.merge() : { job_id: @job_id, time: Time.now } end |
#setup_command_line(*options) ⇒ Object
175 176 177 178 179 180 181 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 175 def setup_command_line(*) @task_argv = [] .each do |option| @task_argv << option end @task_argv end |
#setup_task_arguments ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 183 def setup_task_arguments # stage = "#{@app_name}:#{@env_name} #{@action_name}" stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}" = ["#{stage}"] << "#{@action_name}[#{@task_arguments.join(',')}]" .each do |key, value| << "#{key}=#{value}" if value.present? end << '--trace' if debug_enabled? setup_command_line(*) end |
#start_task ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 46 def start_task @manager.setup_worker_conditions(Actor.current) debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled? @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws| ws.subscribe(@subscription_channel) end end |
#task_approval(message) ⇒ Object
151 152 153 154 155 156 157 158 159 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 151 def task_approval() if @manager.apply_confirmations? && CapistranoMulticonfigParallel.configuration.task_confirmations.include?(['task']) && ['action'] == 'invoke' task_confirmation = @manager.job_to_condition[@job_id][['task']] task_confirmation[:status] = 'confirmed' task_confirmation[:condition].signal(['task']) else publish_rake_event(.merge('approved' => 'yes')) end end |
#update_machine_state(name) ⇒ Object
168 169 170 171 172 173 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 168 def update_machine_state(name) debug("worker #{@job_id} triest to transition from #{@machine.state} to #{name}") if debug_enabled? @machine.transitions.on(name.to_s, @machine.state => name.to_s) @machine.go_to_transition(name.to_s) raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback end |
#work(job, manager) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 30 def work(job, manager) @job = job @worker_state = 'started' @manager = manager @job_confirmation_conditions = [] process_job(job) if job.present? debug("worker #{@job_id} received #{job.inspect}") if debug_enabled? @subscription_channel = "worker_#{@job_id}" @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current) manager.register_worker_for_job(job, Actor.current) end |