Class: CapistranoMulticonfigParallel::CelluloidWorker

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Logger, Celluloid::Notifications
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb

Overview

rubocop:disable ClassLength worker that will spawn a child process in order to execute a capistrano job and monitor that process

Defined Under Namespace

Classes: TaskFailed

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#action_nameObject

Returns the value of attribute action_name.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def action_name
  @action_name
end

#app_nameObject

Returns the value of attribute app_name.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def app_name
  @app_name
end

#clientObject

Returns the value of attribute client.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def client
  @client
end

#current_task_numberObject

Returns the value of attribute current_task_number.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def current_task_number
  @current_task_number
end

#env_nameObject

Returns the value of attribute env_name.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def env_name
  @env_name
end

#env_optionsObject

Returns the value of attribute env_options.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def env_options
  @env_options
end

#execute_deployObject

Returns the value of attribute execute_deploy.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def execute_deploy
  @execute_deploy
end

#executed_dry_runObject

Returns the value of attribute executed_dry_run.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def executed_dry_run
  @executed_dry_run
end

#jobHash

Returns options used for executing capistrano task.

Returns:

  • (Hash)

    options used for executing capistrano task



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19

class CelluloidWorker
  include Celluloid
  include Celluloid::Notifications
  include Celluloid::Logger
  class TaskFailed < StandardError; end

  attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run,
                :rake_tasks, :current_task_number, # tracking tasks
                :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events
                :job_termination_condition, :worker_state

  def work(job, manager)
    @job = job
    @worker_state = 'started'
    @manager = manager
    @job_confirmation_conditions = []
    process_job(job) if job.present?
    debug("worker #{@job_id} received #{job.inspect}") if debug_enabled?
    @subscription_channel = "worker_#{@job_id}"
    @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current)
    manager.register_worker_for_job(job, Actor.current)
  end

  def debug_enabled?
    @manager.class.debug_enabled?
  end

  def start_task
    @manager.setup_worker_conditions(Actor.current)
    debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled?
    @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws|
      ws.subscribe(@subscription_channel)
    end
  end

  def publish_rake_event(data)
    @client.publish(rake_actor_id(data), data)
  end

  def rake_actor_id(data)
    data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}"
  end

  def on_message(message)
    debug("worker #{@job_id} received:  #{message.inspect}") if debug_enabled?
    if @client.succesfull_subscription?(message)
      @successfull_subscription = true
      execute_after_succesfull_subscription
    else
      handle_subscription(message)
    end
  end

  def execute_after_succesfull_subscription
    setup_task_arguments
    if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress
      @executed_dry_run = true
      @rake_tasks = []
      @task_argv << '--dry-run'
      @task_argv << 'count_rake=true'
      @child_process = CapistranoMulticonfigParallel::ChildProcess.new
      Actor.current.link @child_process
      debug("worker #{@job_id} executes: #{generate_command}") if debug_enabled?
      @child_process.async.work(generate_command, actor: Actor.current, silent: true, dry_run: true)
    else
      async.execute_deploy
    end
  end

  def rake_tasks
    @rake_tasks ||= []
  end

  def cd_working_directory
    "cd #{CapistranoMulticonfigParallel.detect_root}"
  end

  def generate_command
    "         \#{cd_working_directory} && RAILS_ENV=\#{@env_name} bundle exec multi_cap \#{@task_argv.join(' ')}\n    CMD\n  end\n\n  def execute_deploy\n    @execute_deploy = true\n    debug(\"invocation chain \#{@job_id} is : \#{@rake_tasks.inspect}\") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress\n    check_child_proces\n    setup_task_arguments\n    debug(\"worker \#{@job_id} executes: \#{generate_command}\") if debug_enabled?\n    @child_process.async.work(generate_command, actor: Actor.current, silent: true)\n    @manager.wait_task_confirmations_worker(Actor.current)\n  end\n\n  def check_child_proces\n    if !defined?(@child_process) || @child_process.nil?\n      @child_process = CapistranoMulticonfigParallel::ChildProcess.new\n      Actor.current.link @child_process\n    else\n      @client.unsubscribe(\"rake_worker_\#{@job_id}_count\")\n      @child_process.exit_status = nil\n    end\n  end\n\n  def on_close(code, reason)\n    debug(\"worker \#{@job_id} websocket connection closed: \#{code.inspect}, \#{reason.inspect}\") if debug_enabled?\n  end\n\n  def check_gitflow\n    return if !@env_name == 'staging' || [email protected]_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_VERIFY_UPTODATE_TASK)\n    @manager.dispatch_new_job(@job.merge('env' => 'production'))\n  end\n\n  def handle_subscription(message)\n    if message_is_about_a_task?(message)\n      check_gitflow\n      save_tasks_to_be_executed(message)\n      update_machine_state(message['task']) # if message['action'] == 'invoke'\n      debug(\"worker \#{@job_id} state is \#{@machine.state}\") if debug_enabled?\n      task_approval(message)\n    else\n      debug(\"worker \#{@job_id} could not handle  \#{message}\") if debug_enabled?\n    end\n  end\n\n  def message_is_about_a_task?(message)\n    message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?\n  end\n\n  def executed_task?(task)\n    @rake_tasks.present? && @rake_tasks[task].present?\n  end\n\n  def task_approval(message)\n    if @manager.apply_confirmations? && CapistranoMulticonfigParallel.configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'\n      task_confirmation = @manager.job_to_condition[@job_id][message['task']]\n      task_confirmation[:status] = 'confirmed'\n      task_confirmation[:condition].signal(message['task'])\n    else\n      publish_rake_event(message.merge('approved' => 'yes'))\n    end\n  end\n\n  def save_tasks_to_be_executed(message)\n    return unless message['action'] == 'count'\n    debug(\"worler \#{@job_id} current invocation chain : \#{@rake_tasks.inspect}\") if debug_enabled?\n    @rake_tasks = [] if @rake_tasks.blank?\n    @rake_tasks << message['task'] if @rake_tasks.last != message['task']\n  end\n\n  def update_machine_state(name)\n    debug(\"worker \#{@job_id} triest to transition from \#{@machine.state} to  \#{name}\") if debug_enabled?\n    @machine.transitions.on(name.to_s, @machine.state => name.to_s)\n    @machine.go_to_transition(name.to_s)\n    raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, \"task \#{@action} failed \") if name == 'deploy:failed' # force worker to rollback\n  end\n\n  def setup_command_line(*options)\n    @task_argv = []\n    options.each do |option|\n      @task_argv << option\n    end\n    @task_argv\n  end\n\n  def setup_task_arguments\n    #   stage = \"\#{@app_name}:\#{@env_name} \#{@action_name}\"\n    stage = @app_name.present? ? \"\#{@app_name}:\#{@env_name}\" : \"\#{@env_name}\"\n    array_options = [\"\#{stage}\"]\n    array_options << \"\#{@action_name}[\#{@task_arguments.join(',')}]\"\n    @env_options.each do |key, value|\n      array_options << \"\#{key}=\#{value}\" if value.present?\n    end\n    array_options << '--trace' if debug_enabled?\n    setup_command_line(*array_options)\n  end\n\n  def send_msg(channel, message = nil)\n    publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }\n  end\n\n  def process_job(job)\n    processed_job = @manager.process_job(job)\n    @job_id = processed_job['job_id']\n    @app_name = processed_job['app_name']\n    @env_name = processed_job['env_name']\n    @action_name = processed_job['action_name']\n    @env_options = processed_job['env_options']\n    @task_arguments = processed_job['task_arguments']\n  end\n\n  def crashed?\n    @action_name == 'deploy:rollback' || @action_name == 'deploy:failed' || @manager.job_failed?(@job)\n  end\n\n  def finish_worker\n    @manager.mark_completed_remaining_tasks(Actor.current)\n    @worker_state = 'finished'\n    @manager.job_to_worker.each do|_job_id, worker|\n      debug(\"worker \#{worker.job_id}has state \#{worker.worker_state}\") if worker.alive? && debug_enabled?\n    end\n  end\n\n  def notify_finished(exit_status)\n    return unless @execute_deploy\n    if exit_status.exitstatus != 0\n      debug(\"worker \#{job_id} tries to terminate\") if debug_enabled?\n      terminate\n    else\n      update_machine_state('FINISHED')\n      debug(\"worker \#{job_id} notifies manager has finished\") if debug_enabled?\n      finish_worker\n    end\n  end\nend\n"

#job_idObject

Returns the value of attribute job_id.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def job_id
  @job_id
end

#job_termination_conditionObject

Returns the value of attribute job_termination_condition.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def job_termination_condition
  @job_termination_condition
end

#machineObject

Returns the value of attribute machine.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def machine
  @machine
end

#managerCapistranoMulticonfigParallel::CelluloidManager

Returns the instance of the manager that delegated the job to this worker.

Returns:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19

class CelluloidWorker
  include Celluloid
  include Celluloid::Notifications
  include Celluloid::Logger
  class TaskFailed < StandardError; end

  attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run,
                :rake_tasks, :current_task_number, # tracking tasks
                :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events
                :job_termination_condition, :worker_state

  def work(job, manager)
    @job = job
    @worker_state = 'started'
    @manager = manager
    @job_confirmation_conditions = []
    process_job(job) if job.present?
    debug("worker #{@job_id} received #{job.inspect}") if debug_enabled?
    @subscription_channel = "worker_#{@job_id}"
    @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current)
    manager.register_worker_for_job(job, Actor.current)
  end

  def debug_enabled?
    @manager.class.debug_enabled?
  end

  def start_task
    @manager.setup_worker_conditions(Actor.current)
    debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled?
    @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws|
      ws.subscribe(@subscription_channel)
    end
  end

  def publish_rake_event(data)
    @client.publish(rake_actor_id(data), data)
  end

  def rake_actor_id(data)
    data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}"
  end

  def on_message(message)
    debug("worker #{@job_id} received:  #{message.inspect}") if debug_enabled?
    if @client.succesfull_subscription?(message)
      @successfull_subscription = true
      execute_after_succesfull_subscription
    else
      handle_subscription(message)
    end
  end

  def execute_after_succesfull_subscription
    setup_task_arguments
    if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress
      @executed_dry_run = true
      @rake_tasks = []
      @task_argv << '--dry-run'
      @task_argv << 'count_rake=true'
      @child_process = CapistranoMulticonfigParallel::ChildProcess.new
      Actor.current.link @child_process
      debug("worker #{@job_id} executes: #{generate_command}") if debug_enabled?
      @child_process.async.work(generate_command, actor: Actor.current, silent: true, dry_run: true)
    else
      async.execute_deploy
    end
  end

  def rake_tasks
    @rake_tasks ||= []
  end

  def cd_working_directory
    "cd #{CapistranoMulticonfigParallel.detect_root}"
  end

  def generate_command
    "         \#{cd_working_directory} && RAILS_ENV=\#{@env_name} bundle exec multi_cap \#{@task_argv.join(' ')}\n    CMD\n  end\n\n  def execute_deploy\n    @execute_deploy = true\n    debug(\"invocation chain \#{@job_id} is : \#{@rake_tasks.inspect}\") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress\n    check_child_proces\n    setup_task_arguments\n    debug(\"worker \#{@job_id} executes: \#{generate_command}\") if debug_enabled?\n    @child_process.async.work(generate_command, actor: Actor.current, silent: true)\n    @manager.wait_task_confirmations_worker(Actor.current)\n  end\n\n  def check_child_proces\n    if !defined?(@child_process) || @child_process.nil?\n      @child_process = CapistranoMulticonfigParallel::ChildProcess.new\n      Actor.current.link @child_process\n    else\n      @client.unsubscribe(\"rake_worker_\#{@job_id}_count\")\n      @child_process.exit_status = nil\n    end\n  end\n\n  def on_close(code, reason)\n    debug(\"worker \#{@job_id} websocket connection closed: \#{code.inspect}, \#{reason.inspect}\") if debug_enabled?\n  end\n\n  def check_gitflow\n    return if !@env_name == 'staging' || [email protected]_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_VERIFY_UPTODATE_TASK)\n    @manager.dispatch_new_job(@job.merge('env' => 'production'))\n  end\n\n  def handle_subscription(message)\n    if message_is_about_a_task?(message)\n      check_gitflow\n      save_tasks_to_be_executed(message)\n      update_machine_state(message['task']) # if message['action'] == 'invoke'\n      debug(\"worker \#{@job_id} state is \#{@machine.state}\") if debug_enabled?\n      task_approval(message)\n    else\n      debug(\"worker \#{@job_id} could not handle  \#{message}\") if debug_enabled?\n    end\n  end\n\n  def message_is_about_a_task?(message)\n    message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?\n  end\n\n  def executed_task?(task)\n    @rake_tasks.present? && @rake_tasks[task].present?\n  end\n\n  def task_approval(message)\n    if @manager.apply_confirmations? && CapistranoMulticonfigParallel.configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'\n      task_confirmation = @manager.job_to_condition[@job_id][message['task']]\n      task_confirmation[:status] = 'confirmed'\n      task_confirmation[:condition].signal(message['task'])\n    else\n      publish_rake_event(message.merge('approved' => 'yes'))\n    end\n  end\n\n  def save_tasks_to_be_executed(message)\n    return unless message['action'] == 'count'\n    debug(\"worler \#{@job_id} current invocation chain : \#{@rake_tasks.inspect}\") if debug_enabled?\n    @rake_tasks = [] if @rake_tasks.blank?\n    @rake_tasks << message['task'] if @rake_tasks.last != message['task']\n  end\n\n  def update_machine_state(name)\n    debug(\"worker \#{@job_id} triest to transition from \#{@machine.state} to  \#{name}\") if debug_enabled?\n    @machine.transitions.on(name.to_s, @machine.state => name.to_s)\n    @machine.go_to_transition(name.to_s)\n    raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, \"task \#{@action} failed \") if name == 'deploy:failed' # force worker to rollback\n  end\n\n  def setup_command_line(*options)\n    @task_argv = []\n    options.each do |option|\n      @task_argv << option\n    end\n    @task_argv\n  end\n\n  def setup_task_arguments\n    #   stage = \"\#{@app_name}:\#{@env_name} \#{@action_name}\"\n    stage = @app_name.present? ? \"\#{@app_name}:\#{@env_name}\" : \"\#{@env_name}\"\n    array_options = [\"\#{stage}\"]\n    array_options << \"\#{@action_name}[\#{@task_arguments.join(',')}]\"\n    @env_options.each do |key, value|\n      array_options << \"\#{key}=\#{value}\" if value.present?\n    end\n    array_options << '--trace' if debug_enabled?\n    setup_command_line(*array_options)\n  end\n\n  def send_msg(channel, message = nil)\n    publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }\n  end\n\n  def process_job(job)\n    processed_job = @manager.process_job(job)\n    @job_id = processed_job['job_id']\n    @app_name = processed_job['app_name']\n    @env_name = processed_job['env_name']\n    @action_name = processed_job['action_name']\n    @env_options = processed_job['env_options']\n    @task_arguments = processed_job['task_arguments']\n  end\n\n  def crashed?\n    @action_name == 'deploy:rollback' || @action_name == 'deploy:failed' || @manager.job_failed?(@job)\n  end\n\n  def finish_worker\n    @manager.mark_completed_remaining_tasks(Actor.current)\n    @worker_state = 'finished'\n    @manager.job_to_worker.each do|_job_id, worker|\n      debug(\"worker \#{worker.job_id}has state \#{worker.worker_state}\") if worker.alive? && debug_enabled?\n    end\n  end\n\n  def notify_finished(exit_status)\n    return unless @execute_deploy\n    if exit_status.exitstatus != 0\n      debug(\"worker \#{job_id} tries to terminate\") if debug_enabled?\n      terminate\n    else\n      update_machine_state('FINISHED')\n      debug(\"worker \#{job_id} notifies manager has finished\") if debug_enabled?\n      finish_worker\n    end\n  end\nend\n"

#publisher_channelObject

Returns the value of attribute publisher_channel.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def publisher_channel
  @publisher_channel
end

#rake_tasksObject

Returns the value of attribute rake_tasks.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def rake_tasks
  @rake_tasks
end

#subscription_channelObject

Returns the value of attribute subscription_channel.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def subscription_channel
  @subscription_channel
end

#successfull_subscriptionObject

Returns the value of attribute successfull_subscription.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def successfull_subscription
  @successfull_subscription
end

#task_argvObject

Returns the value of attribute task_argv.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def task_argv
  @task_argv
end

#worker_stateObject

Returns the value of attribute worker_state.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def worker_state
  @worker_state
end

Instance Method Details

#cd_working_directoryObject



92
93
94
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 92

def cd_working_directory
  "cd #{CapistranoMulticonfigParallel.detect_root}"
end

#check_child_procesObject



112
113
114
115
116
117
118
119
120
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 112

def check_child_proces
  if !defined?(@child_process) || @child_process.nil?
    @child_process = CapistranoMulticonfigParallel::ChildProcess.new
    Actor.current.link @child_process
  else
    @client.unsubscribe("rake_worker_#{@job_id}_count")
    @child_process.exit_status = nil
  end
end

#check_gitflowObject



126
127
128
129
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 126

def check_gitflow
  return if !@env_name == 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_VERIFY_UPTODATE_TASK)
  @manager.dispatch_new_job(@job.merge('env' => 'production'))
end

#crashed?Boolean

Returns:

  • (Boolean)


209
210
211
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 209

def crashed?
  @action_name == 'deploy:rollback' || @action_name == 'deploy:failed' || @manager.job_failed?(@job)
end

#debug_enabled?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 42

def debug_enabled?
  @manager.class.debug_enabled?
end

#execute_after_succesfull_subscriptionObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 72

def execute_after_succesfull_subscription
  setup_task_arguments
  if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress
    @executed_dry_run = true
    @rake_tasks = []
    @task_argv << '--dry-run'
    @task_argv << 'count_rake=true'
    @child_process = CapistranoMulticonfigParallel::ChildProcess.new
    Actor.current.link @child_process
    debug("worker #{@job_id} executes: #{generate_command}") if debug_enabled?
    @child_process.async.work(generate_command, actor: Actor.current, silent: true, dry_run: true)
  else
    async.execute_deploy
  end
end

#executed_task?(task) ⇒ Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 147

def executed_task?(task)
  @rake_tasks.present? && @rake_tasks[task].present?
end

#finish_workerObject



213
214
215
216
217
218
219
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 213

def finish_worker
  @manager.mark_completed_remaining_tasks(Actor.current)
  @worker_state = 'finished'
  @manager.job_to_worker.each do|_job_id, worker|
    debug("worker #{worker.job_id}has state #{worker.worker_state}") if worker.alive? && debug_enabled?
  end
end

#generate_commandObject



96
97
98
99
100
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 96

def generate_command
  "       \#{cd_working_directory} && RAILS_ENV=\#{@env_name} bundle exec multi_cap \#{@task_argv.join(' ')}\n  CMD\nend\n"

#handle_subscription(message) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 131

def handle_subscription(message)
  if message_is_about_a_task?(message)
    check_gitflow
    save_tasks_to_be_executed(message)
    update_machine_state(message['task']) # if message['action'] == 'invoke'
    debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled?
    task_approval(message)
  else
    debug("worker #{@job_id} could not handle  #{message}") if debug_enabled?
  end
end

#message_is_about_a_task?(message) ⇒ Boolean

Returns:

  • (Boolean)


143
144
145
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 143

def message_is_about_a_task?(message)
  message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?
end

#notify_finished(exit_status) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 221

def notify_finished(exit_status)
  return unless @execute_deploy
  if exit_status.exitstatus != 0
    debug("worker #{job_id} tries to terminate") if debug_enabled?
    terminate
  else
    update_machine_state('FINISHED')
    debug("worker #{job_id} notifies manager has finished") if debug_enabled?
    finish_worker
  end
end

#on_close(code, reason) ⇒ Object



122
123
124
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 122

def on_close(code, reason)
  debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled?
end

#on_message(message) ⇒ Object



62
63
64
65
66
67
68
69
70
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 62

def on_message(message)
  debug("worker #{@job_id} received:  #{message.inspect}") if debug_enabled?
  if @client.succesfull_subscription?(message)
    @successfull_subscription = true
    execute_after_succesfull_subscription
  else
    handle_subscription(message)
  end
end

#process_job(job) ⇒ Object



199
200
201
202
203
204
205
206
207
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 199

def process_job(job)
  processed_job = @manager.process_job(job)
  @job_id = processed_job['job_id']
  @app_name = processed_job['app_name']
  @env_name = processed_job['env_name']
  @action_name = processed_job['action_name']
  @env_options = processed_job['env_options']
  @task_arguments = processed_job['task_arguments']
end

#publish_rake_event(data) ⇒ Object



54
55
56
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 54

def publish_rake_event(data)
  @client.publish(rake_actor_id(data), data)
end

#rake_actor_id(data) ⇒ Object



58
59
60
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 58

def rake_actor_id(data)
  data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}"
end

#save_tasks_to_be_executed(message) ⇒ Object



161
162
163
164
165
166
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 161

def save_tasks_to_be_executed(message)
  return unless message['action'] == 'count'
  debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled?
  @rake_tasks = [] if @rake_tasks.blank?
  @rake_tasks << message['task'] if @rake_tasks.last != message['task']
end

#send_msg(channel, message = nil) ⇒ Object



195
196
197
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 195

def send_msg(channel, message = nil)
  publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }
end

#setup_command_line(*options) ⇒ Object



175
176
177
178
179
180
181
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 175

def setup_command_line(*options)
  @task_argv = []
  options.each do |option|
    @task_argv << option
  end
  @task_argv
end

#setup_task_argumentsObject



183
184
185
186
187
188
189
190
191
192
193
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 183

def setup_task_arguments
  #   stage = "#{@app_name}:#{@env_name} #{@action_name}"
  stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}"
  array_options = ["#{stage}"]
  array_options << "#{@action_name}[#{@task_arguments.join(',')}]"
  @env_options.each do |key, value|
    array_options << "#{key}=#{value}" if value.present?
  end
  array_options << '--trace' if debug_enabled?
  setup_command_line(*array_options)
end

#start_taskObject



46
47
48
49
50
51
52
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 46

def start_task
  @manager.setup_worker_conditions(Actor.current)
  debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled?
  @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws|
    ws.subscribe(@subscription_channel)
  end
end

#task_approval(message) ⇒ Object



151
152
153
154
155
156
157
158
159
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 151

def task_approval(message)
  if @manager.apply_confirmations? && CapistranoMulticonfigParallel.configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'
    task_confirmation = @manager.job_to_condition[@job_id][message['task']]
    task_confirmation[:status] = 'confirmed'
    task_confirmation[:condition].signal(message['task'])
  else
    publish_rake_event(message.merge('approved' => 'yes'))
  end
end

#update_machine_state(name) ⇒ Object



168
169
170
171
172
173
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 168

def update_machine_state(name)
  debug("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") if debug_enabled?
  @machine.transitions.on(name.to_s, @machine.state => name.to_s)
  @machine.go_to_transition(name.to_s)
  raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback
end

#work(job, manager) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 30

def work(job, manager)
  @job = job
  @worker_state = 'started'
  @manager = manager
  @job_confirmation_conditions = []
  process_job(job) if job.present?
  debug("worker #{@job_id} received #{job.inspect}") if debug_enabled?
  @subscription_channel = "worker_#{@job_id}"
  @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current)
  manager.register_worker_for_job(job, Actor.current)
end