Class: CapistranoMulticonfigParallel::RakeWorker

Inherits:
Object
  • Object
show all
Includes:
BaseActorHelper
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb

Overview

class that handles the rake task and waits for approval from the celluloid worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from BaseActorHelper

included

Instance Attribute Details

#actionObject (readonly)

Returns the value of attribute action.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def action
  @action
end

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def client
  @client
end

#job_idObject (readonly)

Returns the value of attribute job_id.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def job_id
  @job_id
end

#publisher_channelObject (readonly)

Returns the value of attribute publisher_channel.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def publisher_channel
  @publisher_channel
end

#stdin_resultObject (readonly)

Returns the value of attribute stdin_result.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def stdin_result
  @stdin_result
end

#subscription_channelObject (readonly)

Returns the value of attribute subscription_channel.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def subscription_channel
  @subscription_channel
end

#successfull_subscriptionObject (readonly)

Returns the value of attribute successfull_subscription.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def successfull_subscription
  @successfull_subscription
end

#taskObject (readonly)

Returns the value of attribute task.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def task
  @task
end

#task_approvedObject (readonly)

Returns the value of attribute task_approved.



7
8
9
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7

def task_approved
  @task_approved
end

Instance Method Details

#custom_attributesObject



18
19
20
21
22
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 18

def custom_attributes
  @publisher_channel = "worker_#{@job_id}"
  @action = 'invoke'
  @task = @options['task']
end

#default_settingsObject



41
42
43
44
45
46
47
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 41

def default_settings
  @stdin_result = nil
  @job_id = @options['job_id']
  @subscription_channel = @options['actor_id']
  @task_approved = false
  @successfull_subscription = false
end

#initialize_subscriptionObject



49
50
51
52
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 49

def initialize_subscription
  return if defined?(@client) && @client.present?
  @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: @subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil))
end

#on_close(code, reason) ⇒ Object



116
117
118
119
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 116

def on_close(code, reason)
  log_to_file("websocket connection closed: #{code.inspect}, #{reason.inspect}")
  terminate
end

#on_message(message) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 70

def on_message(message)
  return unless message.present?
  log_to_file("Rake worker #{@job_id} received after on message:", message)
  if @client.succesfull_subscription?(message)
    publish_subscription_successfull(message)
  elsif msg_for_task?(message)
    task_approval(message)
  elsif msg_for_stdin?(message)
    stdin_approval(message)
  else
    show_warning "unknown message: #{message.inspect}"
  end
end

#publish_new_work(new_options = {}) ⇒ Object



24
25
26
27
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 24

def publish_new_work(new_options = {})
  work(@options.merge(new_options))
  publish_to_worker(task_data)
end

#publish_subscription_successfull(message) ⇒ Object



84
85
86
87
88
89
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 84

def publish_subscription_successfull(message)
  return unless @client.succesfull_subscription?(message)
  log_to_file("Rake worker #{@job_id} received after publish_subscription_successfull:", message)
  @successfull_subscription = true
  publish_to_worker(task_data)
end

#publish_to_worker(data) ⇒ Object



66
67
68
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 66

def publish_to_worker(data)
  @client.publish(@publisher_channel, data)
end

#stdin_approval(message) ⇒ Object



98
99
100
101
102
103
104
105
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 98

def stdin_approval(message)
  return unless msg_for_stdin?(message)
  if @job_id == message['job_id']
    @stdin_result = message.fetch('result', '')
  else
    show_warning "unknown stdin_approval #{message.inspect}"
  end
end

#task_approval(message) ⇒ Object



107
108
109
110
111
112
113
114
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 107

def task_approval(message)
  return unless msg_for_task?(message)
  if @job_id == message['job_id'] && message['task'].to_s == task_name.to_s && message['approved'] == 'yes'
    @task_approved = true
  else
    show_warning "unknown task_approval #{message.inspect} #{task_data}"
  end
end

#task_dataObject



58
59
60
61
62
63
64
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 58

def task_data
  {
    action: @action,
    task: task_name,
    job_id: @job_id
  }
end

#task_nameObject



54
55
56
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 54

def task_name
  @task.respond_to?(:name) ? @task.name : @task
end

#user_prompt_needed?(data) ⇒ Boolean

Returns:

  • (Boolean)


121
122
123
124
125
126
127
128
129
130
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 121

def user_prompt_needed?(data)
  question, default = get_question_details(data)
  log_to_file("Rake worker #{@job_id} tries to determine question #{data.inspect} #{question.inspect} #{default.inspect}")
  return if question.blank? || @action != 'invoke'
  publish_to_worker(action: 'stdout',
                    question: question,
                    default: default.present? ? default.delete('()') : '',
                    job_id: @job_id)
  wait_for_stdin_input
end

#wait_execution(name = task_name, time = 0.1) ⇒ Object



29
30
31
32
33
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 29

def wait_execution(name = task_name, time = 0.1)
  #    info "Before waiting #{name}"
  Actor.current.wait_for(name, time)
  #  info "After waiting #{name}"
end

#wait_for(_name, time) ⇒ Object



35
36
37
38
39
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 35

def wait_for(_name, time)
  # info "waiting for #{time} seconds on #{name}"
  sleep time
  # info "done waiting on #{name} "
end

#wait_for_stdin_inputObject



91
92
93
94
95
96
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 91

def wait_for_stdin_input
  wait_execution until @stdin_result.present?
  output = @stdin_result.clone
  @stdin_result = nil
  output
end

#work(options = {}) ⇒ Object



11
12
13
14
15
16
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 11

def work(options = {})
  @options = options.stringify_keys
  default_settings
  custom_attributes
  initialize_subscription
end