Class: CapistranoMulticonfigParallel::CelluloidWorker

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Logger, Celluloid::Notifications
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb

Overview

rubocop:disable ClassLength worker that will spawn a child process in order to execute a capistrano job and monitor that process

Defined Under Namespace

Classes: TaskFailed

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#action_nameObject

Returns the value of attribute action_name.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def action_name
  @action_name
end

#app_nameObject

Returns the value of attribute app_name.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def app_name
  @app_name
end

#clientObject

Returns the value of attribute client.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def client
  @client
end

#current_task_numberObject

Returns the value of attribute current_task_number.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def current_task_number
  @current_task_number
end

#env_nameObject

Returns the value of attribute env_name.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def env_name
  @env_name
end

#env_optionsObject

Returns the value of attribute env_options.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def env_options
  @env_options
end

#execute_deployObject

Returns the value of attribute execute_deploy.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def execute_deploy
  @execute_deploy
end

#executed_dry_runObject

Returns the value of attribute executed_dry_run.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def executed_dry_run
  @executed_dry_run
end

#jobHash

Returns options used for executing capistrano task.

Returns:

  • (Hash)

    options used for executing capistrano task



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19

class CelluloidWorker
  include Celluloid
  include Celluloid::Notifications
  include Celluloid::Logger
  class TaskFailed < StandardError; end

  attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run,
                :rake_tasks, :current_task_number, # tracking tasks
                :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events
                :task_confirmations, :manager_condition, :last_manager_condition # for task conifirmations from manager

  def work(job, manager)
    @job = job
    @manager = manager

    process_job(job) if job.present?
    debug("worker #{@job_id} received #{job.inspect}") if debug_enabled?
    @subscription_channel = "worker_#{@job_id}"
    @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current)
    setup_worker_condition
    manager.register_worker_for_job(job, Actor.current)
  end

  def debug_enabled?
    @manager.class.debug_enabled?
  end

  def start_task
    debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled?
    @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations
    @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws|
      ws.subscribe(@subscription_channel)
    end
  end

  def publish_rake_event(data)
    @client.publish(rake_actor_id(data), data)
  end

  def rake_actor_id(data)
    data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}"
  end

  def on_message(message)
    debug("worker #{@job_id} received:  #{message.inspect}") if debug_enabled?
    if @client.succesfull_subscription?(message)
      @successfull_subscription = true
      execute_after_succesfull_subscription
    else
      handle_subscription(message)
    end
  end

  def execute_after_succesfull_subscription
    setup_task_arguments
    if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress
      @executed_dry_run = true
      @rake_tasks = []
      @task_argv << '--dry-run'
      @task_argv << 'count_rake=true'
      @child_process = CapistranoMulticonfigParallel::ChildProcess.new
      Actor.current.link @child_process
      debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled?
      @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true)
    else
      async.execute_deploy
    end
  end

  def rake_tasks
    @rake_tasks ||= []
  end

  def execute_deploy
    @execute_deploy = true
    debug("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress
    if !defined?(@child_process) || @child_process.nil?
      @child_process = CapistranoMulticonfigParallel::ChildProcess.new
      Actor.current.link @child_process
    else
      @client.unsubscribe("rake_worker_#{@job_id}_count")
      @child_process.exit_status = nil
    end
    setup_task_arguments
    debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled?
    @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, silent: true)
  end

  def on_close(code, reason)
    debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled?
  end

  def handle_subscription(message)
    if message_is_about_a_task?(message)
      save_tasks_to_be_executed(message)
      update_machine_state(message['task']) # if message['action'] == 'invoke'
      debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled?
      task_approval(message)
    else
      debug("worker #{@job_id} could not handle  #{message}") if debug_enabled?
    end
  end

  def message_is_about_a_task?(message)
    message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?
  end

  def task_approval(message)
    if @task_confirmations.include?(message['task']) && message['action'] == 'invoke'
      @manager_condition[message['task']].call(message['task'])
    else
      publish_rake_event(message.merge('approved' => 'yes'))
    end
  end

  def save_tasks_to_be_executed(message)
    return unless message['action'] == 'count'
    debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled?
    @rake_tasks = [] if @rake_tasks.blank?
    @rake_tasks << message['task'] if @rake_tasks.last != message['task']
  end

  def update_machine_state(name)
    debug("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") if debug_enabled?
    @machine.transitions.on(name.to_s, @machine.state => name.to_s)
    @machine.go_to_transition(name.to_s)
    raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback
  end

  def setup_command_line(*options)
    @task_argv = []
    options.each do |option|
      @task_argv << option
    end
    @task_argv
  end

  def setup_task_arguments
    #   stage = "#{@app_name}:#{@env_name} #{@action_name}"
    stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}"
    array_options = ["#{stage}"]
    array_options << "#{@action_name}[#{@task_arguments.join(',')}]"
    @env_options.each do |key, value|
      array_options << "#{key}=#{value}" if value.present?
    end
    array_options << '--trace' if debug_enabled?
    setup_command_line(*array_options)
  end

  def send_msg(channel, message = nil)
    publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }
  end

  def process_job(job)
    @job_id = job['id']
    @app_name = job['app']
    @env_name = job['env']
    @action_name = job['action']
    @env_options = {}
    job['env_options'].each do |key, value|
      @env_options[key] = value if value.present?
    end
    @task_arguments = job['task_arguments']
  end

  def need_confirmation_for_tasks?
    executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active
  end

  def executes_deploy?
    (@action_name == 'deploy' || @action_name == 'deploy:rollback')
  end

  def setup_worker_condition
    job_termination_condition = Celluloid::Condition.new
    job_confirmation_conditions = []
    CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task|
      if need_confirmation_for_tasks?
        job_confirmation_conditions << Celluloid::Condition.new
      else
        job_confirmation_conditions << proc { |sum| sum }
      end
    end
    @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition }
    construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition)
  end

  def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition)
    hash_conditions = {}
    CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index|
      blk = lambda do |sum|
        need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum)
      end
      hash_conditions[task] = blk
    end
    blk_termination = lambda do |sum|
      job_termination_condition.signal(sum)
    end
    @manager_condition = hash_conditions
    @last_manager_condition = blk_termination
  end

  def crashed?
    @action_name == 'deploy:rollback'
  end

  def notify_finished(exit_status)
    return unless @execute_deploy
    if exit_status.exitstatus != 0
      debug("worker #{job_id} tries to terminate")
      terminate
    else
      update_machine_state('FINISHED')
      debug("worker #{job_id} notifies manager has finished")
      @last_manager_condition.call('yes')
    end
  end
end

#job_idObject

Returns the value of attribute job_id.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def job_id
  @job_id
end

#last_manager_conditionObject

Returns the value of attribute last_manager_condition.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def last_manager_condition
  @last_manager_condition
end

#machineObject

Returns the value of attribute machine.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def machine
  @machine
end

#managerCapistranoMulticonfigParallel::CelluloidManager

Returns the instance of the manager that delegated the job to this worker.

Returns:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 19

class CelluloidWorker
  include Celluloid
  include Celluloid::Notifications
  include Celluloid::Logger
  class TaskFailed < StandardError; end

  attr_accessor :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :client, :task_argv, :execute_deploy, :executed_dry_run,
                :rake_tasks, :current_task_number, # tracking tasks
                :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events
                :task_confirmations, :manager_condition, :last_manager_condition # for task conifirmations from manager

  def work(job, manager)
    @job = job
    @manager = manager

    process_job(job) if job.present?
    debug("worker #{@job_id} received #{job.inspect}") if debug_enabled?
    @subscription_channel = "worker_#{@job_id}"
    @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current)
    setup_worker_condition
    manager.register_worker_for_job(job, Actor.current)
  end

  def debug_enabled?
    @manager.class.debug_enabled?
  end

  def start_task
    debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled?
    @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations
    @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws|
      ws.subscribe(@subscription_channel)
    end
  end

  def publish_rake_event(data)
    @client.publish(rake_actor_id(data), data)
  end

  def rake_actor_id(data)
    data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}"
  end

  def on_message(message)
    debug("worker #{@job_id} received:  #{message.inspect}") if debug_enabled?
    if @client.succesfull_subscription?(message)
      @successfull_subscription = true
      execute_after_succesfull_subscription
    else
      handle_subscription(message)
    end
  end

  def execute_after_succesfull_subscription
    setup_task_arguments
    if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress
      @executed_dry_run = true
      @rake_tasks = []
      @task_argv << '--dry-run'
      @task_argv << 'count_rake=true'
      @child_process = CapistranoMulticonfigParallel::ChildProcess.new
      Actor.current.link @child_process
      debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled?
      @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true)
    else
      async.execute_deploy
    end
  end

  def rake_tasks
    @rake_tasks ||= []
  end

  def execute_deploy
    @execute_deploy = true
    debug("invocation chain #{@job_id} is : #{@rake_tasks.inspect}") if debug_enabled? && CapistranoMulticonfigParallel.show_task_progress
    if !defined?(@child_process) || @child_process.nil?
      @child_process = CapistranoMulticonfigParallel::ChildProcess.new
      Actor.current.link @child_process
    else
      @client.unsubscribe("rake_worker_#{@job_id}_count")
      @child_process.exit_status = nil
    end
    setup_task_arguments
    debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled?
    @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, silent: true)
  end

  def on_close(code, reason)
    debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled?
  end

  def handle_subscription(message)
    if message_is_about_a_task?(message)
      save_tasks_to_be_executed(message)
      update_machine_state(message['task']) # if message['action'] == 'invoke'
      debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled?
      task_approval(message)
    else
      debug("worker #{@job_id} could not handle  #{message}") if debug_enabled?
    end
  end

  def message_is_about_a_task?(message)
    message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?
  end

  def task_approval(message)
    if @task_confirmations.include?(message['task']) && message['action'] == 'invoke'
      @manager_condition[message['task']].call(message['task'])
    else
      publish_rake_event(message.merge('approved' => 'yes'))
    end
  end

  def save_tasks_to_be_executed(message)
    return unless message['action'] == 'count'
    debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled?
    @rake_tasks = [] if @rake_tasks.blank?
    @rake_tasks << message['task'] if @rake_tasks.last != message['task']
  end

  def update_machine_state(name)
    debug("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") if debug_enabled?
    @machine.transitions.on(name.to_s, @machine.state => name.to_s)
    @machine.go_to_transition(name.to_s)
    raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback
  end

  def setup_command_line(*options)
    @task_argv = []
    options.each do |option|
      @task_argv << option
    end
    @task_argv
  end

  def setup_task_arguments
    #   stage = "#{@app_name}:#{@env_name} #{@action_name}"
    stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}"
    array_options = ["#{stage}"]
    array_options << "#{@action_name}[#{@task_arguments.join(',')}]"
    @env_options.each do |key, value|
      array_options << "#{key}=#{value}" if value.present?
    end
    array_options << '--trace' if debug_enabled?
    setup_command_line(*array_options)
  end

  def send_msg(channel, message = nil)
    publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }
  end

  def process_job(job)
    @job_id = job['id']
    @app_name = job['app']
    @env_name = job['env']
    @action_name = job['action']
    @env_options = {}
    job['env_options'].each do |key, value|
      @env_options[key] = value if value.present?
    end
    @task_arguments = job['task_arguments']
  end

  def need_confirmation_for_tasks?
    executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active
  end

  def executes_deploy?
    (@action_name == 'deploy' || @action_name == 'deploy:rollback')
  end

  def setup_worker_condition
    job_termination_condition = Celluloid::Condition.new
    job_confirmation_conditions = []
    CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task|
      if need_confirmation_for_tasks?
        job_confirmation_conditions << Celluloid::Condition.new
      else
        job_confirmation_conditions << proc { |sum| sum }
      end
    end
    @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition }
    construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition)
  end

  def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition)
    hash_conditions = {}
    CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index|
      blk = lambda do |sum|
        need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum)
      end
      hash_conditions[task] = blk
    end
    blk_termination = lambda do |sum|
      job_termination_condition.signal(sum)
    end
    @manager_condition = hash_conditions
    @last_manager_condition = blk_termination
  end

  def crashed?
    @action_name == 'deploy:rollback'
  end

  def notify_finished(exit_status)
    return unless @execute_deploy
    if exit_status.exitstatus != 0
      debug("worker #{job_id} tries to terminate")
      terminate
    else
      update_machine_state('FINISHED')
      debug("worker #{job_id} notifies manager has finished")
      @last_manager_condition.call('yes')
    end
  end
end

#manager_conditionObject

Returns the value of attribute manager_condition.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def manager_condition
  @manager_condition
end

#publisher_channelObject

Returns the value of attribute publisher_channel.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def publisher_channel
  @publisher_channel
end

#rake_tasksObject

Returns the value of attribute rake_tasks.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def rake_tasks
  @rake_tasks
end

#subscription_channelObject

Returns the value of attribute subscription_channel.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def subscription_channel
  @subscription_channel
end

#successfull_subscriptionObject

Returns the value of attribute successfull_subscription.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def successfull_subscription
  @successfull_subscription
end

#task_argvObject

Returns the value of attribute task_argv.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def task_argv
  @task_argv
end

#task_confirmationsObject

Returns the value of attribute task_confirmations.



25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 25

def task_confirmations
  @task_confirmations
end

Instance Method Details

#construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 206

def construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition)
  hash_conditions = {}
  CapistranoMulticonfigParallel.configuration.task_confirmations.each_with_index do |task, index|
    blk = lambda do |sum|
      need_confirmation_for_tasks? ? job_confirmation_conditions[index].signal(sum) : job_confirmation_conditions[index].call(sum)
    end
    hash_conditions[task] = blk
  end
  blk_termination = lambda do |sum|
    job_termination_condition.signal(sum)
  end
  @manager_condition = hash_conditions
  @last_manager_condition = blk_termination
end

#crashed?Boolean

Returns:

  • (Boolean)


221
222
223
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 221

def crashed?
  @action_name == 'deploy:rollback'
end

#debug_enabled?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 42

def debug_enabled?
  @manager.class.debug_enabled?
end

#execute_after_succesfull_subscriptionObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 72

def execute_after_succesfull_subscription
  setup_task_arguments
  if (@action_name == 'deploy' || @action_name == 'deploy:rollback') && CapistranoMulticonfigParallel.show_task_progress
    @executed_dry_run = true
    @rake_tasks = []
    @task_argv << '--dry-run'
    @task_argv << 'count_rake=true'
    @child_process = CapistranoMulticonfigParallel::ChildProcess.new
    Actor.current.link @child_process
    debug("worker #{@job_id} executes: bundle exec multi_cap #{@task_argv.join(' ')}") if debug_enabled?
    @child_process.async.work("bundle exec multi_cap #{@task_argv.join(' ')}", actor: Actor.current, dry_run: true)
  else
    async.execute_deploy
  end
end

#executes_deploy?Boolean

Returns:

  • (Boolean)


188
189
190
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 188

def executes_deploy?
  (@action_name == 'deploy' || @action_name == 'deploy:rollback')
end

#handle_subscription(message) ⇒ Object



111
112
113
114
115
116
117
118
119
120
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 111

def handle_subscription(message)
  if message_is_about_a_task?(message)
    save_tasks_to_be_executed(message)
    update_machine_state(message['task']) # if message['action'] == 'invoke'
    debug("worker #{@job_id} state is #{@machine.state}") if debug_enabled?
    task_approval(message)
  else
    debug("worker #{@job_id} could not handle  #{message}") if debug_enabled?
  end
end

#message_is_about_a_task?(message) ⇒ Boolean

Returns:

  • (Boolean)


122
123
124
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 122

def message_is_about_a_task?(message)
  message.present? && message.is_a?(Hash) && message['action'].present? && message['job_id'].present? && message['task'].present?
end

#need_confirmation_for_tasks?Boolean

Returns:

  • (Boolean)


184
185
186
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 184

def need_confirmation_for_tasks?
  executes_deploy? && CapistranoMulticonfigParallel.configuration.task_confirmation_active
end

#notify_finished(exit_status) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 225

def notify_finished(exit_status)
  return unless @execute_deploy
  if exit_status.exitstatus != 0
    debug("worker #{job_id} tries to terminate")
    terminate
  else
    update_machine_state('FINISHED')
    debug("worker #{job_id} notifies manager has finished")
    @last_manager_condition.call('yes')
  end
end

#on_close(code, reason) ⇒ Object



107
108
109
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 107

def on_close(code, reason)
  debug("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}") if debug_enabled?
end

#on_message(message) ⇒ Object



62
63
64
65
66
67
68
69
70
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 62

def on_message(message)
  debug("worker #{@job_id} received:  #{message.inspect}") if debug_enabled?
  if @client.succesfull_subscription?(message)
    @successfull_subscription = true
    execute_after_succesfull_subscription
  else
    handle_subscription(message)
  end
end

#process_job(job) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 172

def process_job(job)
  @job_id = job['id']
  @app_name = job['app']
  @env_name = job['env']
  @action_name = job['action']
  @env_options = {}
  job['env_options'].each do |key, value|
    @env_options[key] = value if value.present?
  end
  @task_arguments = job['task_arguments']
end

#publish_rake_event(data) ⇒ Object



54
55
56
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 54

def publish_rake_event(data)
  @client.publish(rake_actor_id(data), data)
end

#rake_actor_id(data) ⇒ Object



58
59
60
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 58

def rake_actor_id(data)
  data['action'].present? && data['action'] == 'count' ? "rake_worker_#{@job_id}_count" : "rake_worker_#{@job_id}"
end

#save_tasks_to_be_executed(message) ⇒ Object



134
135
136
137
138
139
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 134

def save_tasks_to_be_executed(message)
  return unless message['action'] == 'count'
  debug("worler #{@job_id} current invocation chain : #{@rake_tasks.inspect}") if debug_enabled?
  @rake_tasks = [] if @rake_tasks.blank?
  @rake_tasks << message['task'] if @rake_tasks.last != message['task']
end

#send_msg(channel, message = nil) ⇒ Object



168
169
170
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 168

def send_msg(channel, message = nil)
  publish channel, message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, time: Time.now }
end

#setup_command_line(*options) ⇒ Object



148
149
150
151
152
153
154
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 148

def setup_command_line(*options)
  @task_argv = []
  options.each do |option|
    @task_argv << option
  end
  @task_argv
end

#setup_task_argumentsObject



156
157
158
159
160
161
162
163
164
165
166
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 156

def setup_task_arguments
  #   stage = "#{@app_name}:#{@env_name} #{@action_name}"
  stage = @app_name.present? ? "#{@app_name}:#{@env_name}" : "#{@env_name}"
  array_options = ["#{stage}"]
  array_options << "#{@action_name}[#{@task_arguments.join(',')}]"
  @env_options.each do |key, value|
    array_options << "#{key}=#{value}" if value.present?
  end
  array_options << '--trace' if debug_enabled?
  setup_command_line(*array_options)
end

#setup_worker_conditionObject



192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 192

def setup_worker_condition
  job_termination_condition = Celluloid::Condition.new
  job_confirmation_conditions = []
  CapistranoMulticonfigParallel.configuration.task_confirmations.each do |_task|
    if need_confirmation_for_tasks?
      job_confirmation_conditions << Celluloid::Condition.new
    else
      job_confirmation_conditions << proc { |sum| sum }
    end
  end
  @manager.job_to_condition[@job_id] = { first_condition: job_confirmation_conditions, last_condition: job_termination_condition }
  construct_blocks_for_conditions(job_confirmation_conditions, job_termination_condition)
end

#start_taskObject



46
47
48
49
50
51
52
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 46

def start_task
  debug("exec worker #{@job_id} starts task with #{@job.inspect}") if debug_enabled?
  @task_confirmations = CapistranoMulticonfigParallel.configuration.task_confirmations
  @client = CelluloidPubsub::Client.connect(actor: Actor.current, enable_debug: @manager.class.debug_websocket?) do |ws|
    ws.subscribe(@subscription_channel)
  end
end

#task_approval(message) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 126

def task_approval(message)
  if @task_confirmations.include?(message['task']) && message['action'] == 'invoke'
    @manager_condition[message['task']].call(message['task'])
  else
    publish_rake_event(message.merge('approved' => 'yes'))
  end
end

#update_machine_state(name) ⇒ Object



141
142
143
144
145
146
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 141

def update_machine_state(name)
  debug("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") if debug_enabled?
  @machine.transitions.on(name.to_s, @machine.state => name.to_s)
  @machine.go_to_transition(name.to_s)
  raise(CapistranoMulticonfigParallel::CelluloidWorker::TaskFailed, "task #{@action} failed ") if name == 'deploy:failed' # force worker to rollback
end

#work(job, manager) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 30

def work(job, manager)
  @job = job
  @manager = manager

  process_job(job) if job.present?
  debug("worker #{@job_id} received #{job.inspect}") if debug_enabled?
  @subscription_channel = "worker_#{@job_id}"
  @machine = CapistranoMulticonfigParallel::StateMachine.new(job, Actor.current)
  setup_worker_condition
  manager.register_worker_for_job(job, Actor.current)
end