Class: CapistranoMulticonfigParallel::CelluloidWorker

Inherits:
Object
  • Object
show all
Includes:
BaseActorHelper
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb

Overview

worker that will spawn a child process in order to execute a capistrano job and monitor that process

Constant Summary collapse

ATTRIBUTE_LIST =
[
  :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :socket_connection, :task_argv,
  :rake_tasks, :current_task_number, # tracking tasks
  :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events
  :job_termination_condition, :invocation_chain, :filename, :worker_log, :exit_status, :old_job
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseActorHelper

included

Constructor Details

#initialize(*args) ⇒ CelluloidWorker

Returns a new instance of CelluloidWorker.



34
35
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 34

def initialize(*args)
end

Instance Attribute Details

#jobHash

Returns options used for executing capistrano task.

Returns:

  • (Hash)

    options used for executing capistrano task



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 20

class CelluloidWorker
  include CapistranoMulticonfigParallel::BaseActorHelper


  ATTRIBUTE_LIST = [
    :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :socket_connection, :task_argv,
    :rake_tasks, :current_task_number, # tracking tasks
    :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events
    :job_termination_condition, :invocation_chain, :filename, :worker_log, :exit_status, :old_job
  ]

  attr_reader *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST
  attr_accessor *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST

  def initialize(*args)
  end

  def work(job, manager, old_job)
    @job = job
    @old_job = old_job
    @job_id = job.id
    @worker_state = job.status
    @manager = manager
    @job_confirmation_conditions = []
    log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}")
    @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}"
    @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current)
    @manager.setup_worker_conditions(@job)
    manager.register_worker_for_job(job, Actor.current)
  end


  def worker_state
    if job.status.to_s.downcase != 'dead' && Actor.current.alive?
      @machine.state.to_s.green
    else
      job.status = 'dead'
      job.status.upcase.red
    end
  end

  def start_task
    log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}")
    if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job)
      @old_job.new_jobs_dispatched << @job.id
    end
    @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil))
  end

  def publish_rake_event(data)
    log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}")
    @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data)
  end

  def on_message(message)
    log_to_file("worker #{@job_id} received:  #{message.inspect}")
    if @socket_connection.succesfull_subscription?(message)
      @successfull_subscription = true
      execute_after_succesfull_subscription
    else
      handle_subscription(message)
    end
  end

  def execute_after_succesfull_subscription
    async.execute_deploy
    @manager.async.wait_task_confirmations_worker(@job)
  end

  def rake_tasks
    @rake_tasks ||= []
  end

  def invocation_chain
    @invocation_chain ||= []
  end

  def execute_deploy
    log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}")
    check_child_proces
    command = job.fetch_deploy_command
    log_to_file("worker #{@job_id} executes: #{command}")
    @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus)
  end

  def check_child_proces
    @child_process = CapistranoMulticonfigParallel::ProcessRunner.new
    Actor.current.link @child_process
    @child_process
  end

  def on_close(code, reason)
    log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}")
  end

  def check_gitflow
    return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK)
    mark_for_dispatching_new_job
    @manager.dispatch_new_job(@job, stage: 'production')
  end

  def handle_subscription(message)
    if message_is_about_a_task?(message)
      check_gitflow
      save_tasks_to_be_executed(message)
      async.update_machine_state(message['task']) # if message['action'] == 'invoke'
      log_to_file("worker #{@job_id} state is #{@machine.state}")
      task_approval(message)
    elsif message_is_for_stdout?(message)
      result = Celluloid::Actor[:terminal_server].show_confirmation(message['question'], message['default'])
      publish_rake_event(message.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin'))
    elsif message_from_bundler?(message)

      #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) }
      # if gem_messsage.present?
      #     async.update_machine_state("insta")
      # else
      async.update_machine_state(message['task'])
      #end
    else
      log_to_file(message, job_id: @job_id)
    end
  end


  def executed_task?(task)
    rake_tasks.present? && rake_tasks.index(task.to_s).present?
  end

  def task_approval(message)
    job_conditions = @manager.job_to_condition[@job_id]
    log_to_file("worker #{@job_id} checks if task : #{message['task'].inspect} is included in #{configuration.task_confirmations.inspect}")
    if job_conditions.present? && configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'
      log_to_file("worker #{@job_id} signals approval for task : #{message['task'].inspect}")
      task_confirmation = job_conditions[message['task']]
      task_confirmation[:status] = 'confirmed'
      task_confirmation[:condition].signal(message['task'])
    else
      publish_rake_event(message.merge('approved' => 'yes'))
    end
  end

  def save_tasks_to_be_executed(message)
    log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}")
    rake_tasks << message['task'] if rake_tasks.last != message['task']
    invocation_chain << message['task'] if invocation_chain.last != message['task']
  end

  def update_machine_state(name, options = {})
    log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") unless options[:bundler]
    @machine.go_to_transition(name.to_s, options)
    error_message = "worker #{@job_id} task #{name} failed "
    raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message) if job.failed? # force worker to rollback
  end

  def send_msg(channel, message = nil)
    message = message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, message: message }
    log_to_file("worker #{@job_id} triest to send to #{channel} #{message}")
    publish channel, message
  end

  def finish_worker(exit_status)
    log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}")
    @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive?
    update_machine_state('FINISHED') if exit_status == 0
    @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished?
  end


  def notify_finished(exit_status, _runner_status)
    @job.mark_for_dispatching_new_job if exit_status != 0
    @job.exit_status = exit_status
    finish_worker(exit_status)
    return if exit_status == 0
    error_message = "worker #{@job_id} task  failed with exit status #{exit_status.inspect}  "
    raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message)
  end

  # def inspect
  #   to_s
  # end
  #
  # def to_s
  #    "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) alive>"
  # rescue
  #   "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) dead>"
  # end

end

#managerCapistranoMulticonfigParallel::CelluloidManager

Returns the instance of the manager that delegated the job to this worker.

Returns:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 20

class CelluloidWorker
  include CapistranoMulticonfigParallel::BaseActorHelper


  ATTRIBUTE_LIST = [
    :job, :manager, :job_id, :app_name, :env_name, :action_name, :env_options, :machine, :socket_connection, :task_argv,
    :rake_tasks, :current_task_number, # tracking tasks
    :successfull_subscription, :subscription_channel, :publisher_channel, # for subscriptions and publishing events
    :job_termination_condition, :invocation_chain, :filename, :worker_log, :exit_status, :old_job
  ]

  attr_reader *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST
  attr_accessor *CapistranoMulticonfigParallel::CelluloidWorker::ATTRIBUTE_LIST

  def initialize(*args)
  end

  def work(job, manager, old_job)
    @job = job
    @old_job = old_job
    @job_id = job.id
    @worker_state = job.status
    @manager = manager
    @job_confirmation_conditions = []
    log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}")
    @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}"
    @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current)
    @manager.setup_worker_conditions(@job)
    manager.register_worker_for_job(job, Actor.current)
  end


  def worker_state
    if job.status.to_s.downcase != 'dead' && Actor.current.alive?
      @machine.state.to_s.green
    else
      job.status = 'dead'
      job.status.upcase.red
    end
  end

  def start_task
    log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}")
    if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job)
      @old_job.new_jobs_dispatched << @job.id
    end
    @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil))
  end

  def publish_rake_event(data)
    log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}")
    @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data)
  end

  def on_message(message)
    log_to_file("worker #{@job_id} received:  #{message.inspect}")
    if @socket_connection.succesfull_subscription?(message)
      @successfull_subscription = true
      execute_after_succesfull_subscription
    else
      handle_subscription(message)
    end
  end

  def execute_after_succesfull_subscription
    async.execute_deploy
    @manager.async.wait_task_confirmations_worker(@job)
  end

  def rake_tasks
    @rake_tasks ||= []
  end

  def invocation_chain
    @invocation_chain ||= []
  end

  def execute_deploy
    log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}")
    check_child_proces
    command = job.fetch_deploy_command
    log_to_file("worker #{@job_id} executes: #{command}")
    @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus)
  end

  def check_child_proces
    @child_process = CapistranoMulticonfigParallel::ProcessRunner.new
    Actor.current.link @child_process
    @child_process
  end

  def on_close(code, reason)
    log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}")
  end

  def check_gitflow
    return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK)
    mark_for_dispatching_new_job
    @manager.dispatch_new_job(@job, stage: 'production')
  end

  def handle_subscription(message)
    if message_is_about_a_task?(message)
      check_gitflow
      save_tasks_to_be_executed(message)
      async.update_machine_state(message['task']) # if message['action'] == 'invoke'
      log_to_file("worker #{@job_id} state is #{@machine.state}")
      task_approval(message)
    elsif message_is_for_stdout?(message)
      result = Celluloid::Actor[:terminal_server].show_confirmation(message['question'], message['default'])
      publish_rake_event(message.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin'))
    elsif message_from_bundler?(message)

      #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) }
      # if gem_messsage.present?
      #     async.update_machine_state("insta")
      # else
      async.update_machine_state(message['task'])
      #end
    else
      log_to_file(message, job_id: @job_id)
    end
  end


  def executed_task?(task)
    rake_tasks.present? && rake_tasks.index(task.to_s).present?
  end

  def task_approval(message)
    job_conditions = @manager.job_to_condition[@job_id]
    log_to_file("worker #{@job_id} checks if task : #{message['task'].inspect} is included in #{configuration.task_confirmations.inspect}")
    if job_conditions.present? && configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'
      log_to_file("worker #{@job_id} signals approval for task : #{message['task'].inspect}")
      task_confirmation = job_conditions[message['task']]
      task_confirmation[:status] = 'confirmed'
      task_confirmation[:condition].signal(message['task'])
    else
      publish_rake_event(message.merge('approved' => 'yes'))
    end
  end

  def save_tasks_to_be_executed(message)
    log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}")
    rake_tasks << message['task'] if rake_tasks.last != message['task']
    invocation_chain << message['task'] if invocation_chain.last != message['task']
  end

  def update_machine_state(name, options = {})
    log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") unless options[:bundler]
    @machine.go_to_transition(name.to_s, options)
    error_message = "worker #{@job_id} task #{name} failed "
    raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message) if job.failed? # force worker to rollback
  end

  def send_msg(channel, message = nil)
    message = message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, message: message }
    log_to_file("worker #{@job_id} triest to send to #{channel} #{message}")
    publish channel, message
  end

  def finish_worker(exit_status)
    log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}")
    @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive?
    update_machine_state('FINISHED') if exit_status == 0
    @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished?
  end


  def notify_finished(exit_status, _runner_status)
    @job.mark_for_dispatching_new_job if exit_status != 0
    @job.exit_status = exit_status
    finish_worker(exit_status)
    return if exit_status == 0
    error_message = "worker #{@job_id} task  failed with exit status #{exit_status.inspect}  "
    raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message)
  end

  # def inspect
  #   to_s
  # end
  #
  # def to_s
  #    "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) alive>"
  # rescue
  #   "#<#{self.class}(#{Actor.current.mailbox.address.inspect}) dead>"
  # end

end

Instance Method Details

#check_child_procesObject



105
106
107
108
109
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 105

def check_child_proces
  @child_process = CapistranoMulticonfigParallel::ProcessRunner.new
  Actor.current.link @child_process
  @child_process
end

#check_gitflowObject



115
116
117
118
119
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 115

def check_gitflow
  return if @job.stage != 'staging' || !@manager.can_tag_staging? || !executed_task?(CapistranoMulticonfigParallel::GITFLOW_TAG_STAGING_TASK)
  mark_for_dispatching_new_job
  @manager.dispatch_new_job(@job, stage: 'production')
end

#execute_after_succesfull_subscriptionObject



84
85
86
87
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 84

def execute_after_succesfull_subscription
  async.execute_deploy
  @manager.async.wait_task_confirmations_worker(@job)
end

#execute_deployObject



97
98
99
100
101
102
103
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 97

def execute_deploy
  log_to_file("invocation chain #{@job_id} is : #{@rake_tasks.inspect}")
  check_child_proces
  command = job.fetch_deploy_command
  log_to_file("worker #{@job_id} executes: #{command}")
  @child_process.async.work(@job, command, actor: Actor.current, silent: true, process_sync: :async, runner_status_klass: CapistranoMulticonfigParallel::ChildProcessStatus)
end

#executed_task?(task) ⇒ Boolean

Returns:

  • (Boolean)


145
146
147
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 145

def executed_task?(task)
  rake_tasks.present? && rake_tasks.index(task.to_s).present?
end

#finish_worker(exit_status) ⇒ Object



181
182
183
184
185
186
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 181

def finish_worker(exit_status)
  log_to_file("worker #{job_id} tries to terminate with exit_status #{exit_status}")
  @manager.mark_completed_remaining_tasks(@job) if Actor.current.alive?
  update_machine_state('FINISHED') if exit_status == 0
  @manager.workers_terminated.signal('completed') if !@job.marked_for_dispatching_new_job? && @manager.present? && @manager.alive? && @manager.all_workers_finished?
end

#handle_subscription(message) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 121

def handle_subscription(message)
  if message_is_about_a_task?(message)
    check_gitflow
    save_tasks_to_be_executed(message)
    async.update_machine_state(message['task']) # if message['action'] == 'invoke'
    log_to_file("worker #{@job_id} state is #{@machine.state}")
    task_approval(message)
  elsif message_is_for_stdout?(message)
    result = Celluloid::Actor[:terminal_server].show_confirmation(message['question'], message['default'])
    publish_rake_event(message.merge('action' => 'stdin', 'result' => result, 'client_action' => 'stdin'))
  elsif message_from_bundler?(message)

    #gem_messsage = job.gem_specs.find{|spec| message['task'].include?(spec.name) }
    # if gem_messsage.present?
    #     async.update_machine_state("insta")
    # else
    async.update_machine_state(message['task'])
    #end
  else
    log_to_file(message, job_id: @job_id)
  end
end

#invocation_chainObject



93
94
95
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 93

def invocation_chain
  @invocation_chain ||= []
end

#notify_finished(exit_status, _runner_status) ⇒ Object

Raises:



189
190
191
192
193
194
195
196
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 189

def notify_finished(exit_status, _runner_status)
  @job.mark_for_dispatching_new_job if exit_status != 0
  @job.exit_status = exit_status
  finish_worker(exit_status)
  return if exit_status == 0
  error_message = "worker #{@job_id} task  failed with exit status #{exit_status.inspect}  "
  raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message)
end

#on_close(code, reason) ⇒ Object



111
112
113
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 111

def on_close(code, reason)
  log_to_file("worker #{@job_id} websocket connection closed: #{code.inspect}, #{reason.inspect}")
end

#on_message(message) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 74

def on_message(message)
  log_to_file("worker #{@job_id} received:  #{message.inspect}")
  if @socket_connection.succesfull_subscription?(message)
    @successfull_subscription = true
    execute_after_succesfull_subscription
  else
    handle_subscription(message)
  end
end

#publish_rake_event(data) ⇒ Object



69
70
71
72
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 69

def publish_rake_event(data)
  log_to_file("worker #{@job_id} rties to publish into channel #{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id} data #{data.inspect}")
  @socket_connection.publish("#{CapistranoSentinel::RequestHooks::SUBSCRIPTION_PREFIX}#{@job_id}", data)
end

#rake_tasksObject



89
90
91
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 89

def rake_tasks
  @rake_tasks ||= []
end

#save_tasks_to_be_executed(message) ⇒ Object



162
163
164
165
166
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 162

def save_tasks_to_be_executed(message)
  log_to_file("worler #{@job_id} current invocation chain : #{rake_tasks.inspect}")
  rake_tasks << message['task'] if rake_tasks.last != message['task']
  invocation_chain << message['task'] if invocation_chain.last != message['task']
end

#send_msg(channel, message = nil) ⇒ Object



175
176
177
178
179
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 175

def send_msg(channel, message = nil)
  message = message.present? && message.is_a?(Hash) ? { job_id: @job_id }.merge(message) : { job_id: @job_id, message: message }
  log_to_file("worker #{@job_id} triest to send to #{channel} #{message}")
  publish channel, message
end

#start_taskObject



61
62
63
64
65
66
67
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 61

def start_task
  log_to_file("exec worker #{@job_id} starts task and subscribes to #{@subscription_channel}")
  if @old_job.present? && @old_job.is_a?(CapistranoMulticonfigParallel::Job)
    @old_job.new_jobs_dispatched << @job.id
  end
  @socket_connection = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil))
end

#task_approval(message) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 149

def task_approval(message)
  job_conditions = @manager.job_to_condition[@job_id]
  log_to_file("worker #{@job_id} checks if task : #{message['task'].inspect} is included in #{configuration.task_confirmations.inspect}")
  if job_conditions.present? && configuration.task_confirmations.include?(message['task']) && message['action'] == 'invoke'
    log_to_file("worker #{@job_id} signals approval for task : #{message['task'].inspect}")
    task_confirmation = job_conditions[message['task']]
    task_confirmation[:status] = 'confirmed'
    task_confirmation[:condition].signal(message['task'])
  else
    publish_rake_event(message.merge('approved' => 'yes'))
  end
end

#update_machine_state(name, options = {}) ⇒ Object

Raises:



168
169
170
171
172
173
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 168

def update_machine_state(name, options = {})
  log_to_file("worker #{@job_id} triest to transition from #{@machine.state} to  #{name}") unless options[:bundler]
  @machine.go_to_transition(name.to_s, options)
  error_message = "worker #{@job_id} task #{name} failed "
  raise(CapistranoMulticonfigParallel::TaskFailed.new(error_message), error_message) if job.failed? # force worker to rollback
end

#work(job, manager, old_job) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 37

def work(job, manager, old_job)
  @job = job
  @old_job = old_job
  @job_id = job.id
  @worker_state = job.status
  @manager = manager
  @job_confirmation_conditions = []
  log_to_file("worker #{@job_id} received #{job.inspect} and #{old_job.inspect}")
  @subscription_channel = "#{CapistranoSentinel::RequestHooks::PUBLISHER_PREFIX}#{@job_id}"
  @machine = CapistranoMulticonfigParallel::StateMachine.new(@job, Actor.current)
  @manager.setup_worker_conditions(@job)
  manager.register_worker_for_job(job, Actor.current)
end

#worker_stateObject



52
53
54
55
56
57
58
59
# File 'lib/capistrano_multiconfig_parallel/celluloid/celluloid_worker.rb', line 52

def worker_state
  if job.status.to_s.downcase != 'dead' && Actor.current.alive?
    @machine.state.to_s.green
  else
    job.status = 'dead'
    job.status.upcase.red
  end
end