Class: CapistranoMulticonfigParallel::RakeWorker

Inherits:
Object
  • Object
show all
Includes:
ApplicationHelper, Celluloid, Celluloid::Logger
Defined in:
lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb

Overview

class that handles the rake task and waits for approval from the celluloid worker

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ApplicationHelper

action_confirmed?, check_hash_set, check_numeric, fetch_parsed_string, fetch_remaining_arguments, find_loaded_gem, find_remaining_args, get_question_details, msg_for_stdin?, msg_for_task?, multi_fetch_argv, parse_task_string, percent_of, regex_last_match, setup_command_line_standard, strip_characters_from_string, value_is_array?, verify_array_of_strings, verify_empty_options, warn_array_without_strings, wrap_string

Methods included from CoreHelper

app_debug_enabled?, ask_confirm, ask_stdout_confirmation, check_terminal_tty, debug_websocket?, error_filtered?, execute_with_rescue, find_worker_log, force_confirmation, format_error, log_error, log_output_error, log_to_file, print_to_log_file, rescue_error, rescue_interrupt, setup_filename_logger, setup_logger_formatter, show_warning, terminal_actor, websocket_config, websocket_server_config

Methods included from InternalHelper

check_file, config_file, create_log_file, custom_commands, default_internal_config, default_internal_configuration_params, detect_root, enable_main_log_file, fail_capfile_not_found, fetch_default_internal_config, find_config_type, find_env_multi_cap_root, find_file_in_directory, internal_config_directory, internal_config_file, log_directory, main_log_file, multi_level_prop, pathname_is_root?, pwd_directory, pwd_parent_dir, root, setup_default_configuration_types, sliced_default_config, try_detect_capfile

Instance Attribute Details

#actionObject (readonly)

Returns the value of attribute action.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def action
  @action
end

#clientObject (readonly)

Returns the value of attribute client.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def client
  @client
end

#envObject (readonly)

Returns the value of attribute env.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def env
  @env
end

#job_idObject (readonly)

Returns the value of attribute job_id.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def job_id
  @job_id
end

#publisher_channelObject (readonly)

Returns the value of attribute publisher_channel.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def publisher_channel
  @publisher_channel
end

#stdin_resultObject (readonly)

Returns the value of attribute stdin_result.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def stdin_result
  @stdin_result
end

#subscription_channelObject (readonly)

Returns the value of attribute subscription_channel.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def subscription_channel
  @subscription_channel
end

#successfull_subscriptionObject (readonly)

Returns the value of attribute successfull_subscription.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def successfull_subscription
  @successfull_subscription
end

#taskObject (readonly)

Returns the value of attribute task.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def task
  @task
end

#task_approvedObject (readonly)

Returns the value of attribute task_approved.



9
10
11
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 9

def task_approved
  @task_approved
end

Instance Method Details

#custom_attributesObject



21
22
23
24
25
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 21

def custom_attributes
  @publisher_channel = "worker_#{@job_id}"
  @action = 'invoke'
  @task = @options['task']
end

#default_settingsObject



44
45
46
47
48
49
50
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 44

def default_settings
  @stdin_result = nil
  @job_id = @options['job_id']
  @subscription_channel = @options['actor_id']
  @task_approved = false
  @successfull_subscription = false
end

#initialize_subscriptionObject



52
53
54
55
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 52

def initialize_subscription
  return if defined?(@client) && @client.present?
  @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: @subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil))
end

#on_close(code, reason) ⇒ Object



119
120
121
122
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 119

def on_close(code, reason)
  log_to_file("websocket connection closed: #{code.inspect}, #{reason.inspect}")
  terminate
end

#on_message(message) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 73

def on_message(message)
  return unless message.present?
  log_to_file("Rake worker #{@job_id} received after on message:", message)
  if @client.succesfull_subscription?(message)
    publish_subscription_successfull(message)
  elsif msg_for_task?(message)
    task_approval(message)
  elsif msg_for_stdin?(message)
    stdin_approval(message)
  else
    show_warning "unknown action: #{message.inspect}"
  end
end

#publish_new_work(env, new_options = {}) ⇒ Object



27
28
29
30
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 27

def publish_new_work(env, new_options = {})
  work(env, @options.merge(new_options))
  publish_to_worker(task_data)
end

#publish_subscription_successfull(message) ⇒ Object



87
88
89
90
91
92
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 87

def publish_subscription_successfull(message)
  return unless @client.succesfull_subscription?(message)
  log_to_file("Rake worker #{@job_id} received after publish_subscription_successfull:", message)
  @successfull_subscription = true
  publish_to_worker(task_data)
end

#publish_to_worker(data) ⇒ Object



69
70
71
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 69

def publish_to_worker(data)
  @client.publish(@publisher_channel, data)
end

#stdin_approval(message) ⇒ Object



101
102
103
104
105
106
107
108
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 101

def stdin_approval(message)
  return unless msg_for_stdin?(message)
  if @job_id == message['job_id']
    @stdin_result = message.fetch('result', '')
  else
    show_warning "unknown invocation #{message.inspect}"
  end
end

#task_approval(message) ⇒ Object



110
111
112
113
114
115
116
117
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 110

def task_approval(message)
  return unless msg_for_task?(message)
  if @job_id == message['job_id'] && message['task'] == task_name && message['approved'] == 'yes'
    @task_approved = true
  else
    show_warning "unknown invocation #{message.inspect}"
  end
end

#task_dataObject



61
62
63
64
65
66
67
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 61

def task_data
  {
    action: @action,
    task: task_name,
    job_id: @job_id
  }
end

#task_nameObject



57
58
59
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 57

def task_name
  @task.name
end

#user_prompt_needed?(data) ⇒ Boolean

Returns:

  • (Boolean)


124
125
126
127
128
129
130
131
132
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 124

def user_prompt_needed?(data)
  question, default = get_question_details(data)
  return if question.blank? || @action != 'invoke'
  publish_to_worker(action: 'stdout',
                    question: question,
                    default: default.delete('()'),
                    job_id: @job_id)
  wait_for_stdin_input
end

#wait_execution(name = task_name, time = 0.1) ⇒ Object



32
33
34
35
36
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 32

def wait_execution(name = task_name, time = 0.1)
  #    info "Before waiting #{name}"
  Actor.current.wait_for(name, time)
  #  info "After waiting #{name}"
end

#wait_for(_name, time) ⇒ Object



38
39
40
41
42
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 38

def wait_for(_name, time)
  # info "waiting for #{time} seconds on #{name}"
  sleep time
  # info "done waiting on #{name} "
end

#wait_for_stdin_inputObject



94
95
96
97
98
99
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 94

def wait_for_stdin_input
  wait_execution until @stdin_result.present?
  output = @stdin_result.clone
  @stdin_result = nil
  output
end

#work(env, options = {}) ⇒ Object



13
14
15
16
17
18
19
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 13

def work(env, options = {})
  @options = options.stringify_keys
  @env = env
  default_settings
  custom_attributes
  initialize_subscription
end