Class: CapistranoMulticonfigParallel::RakeWorker
- Inherits:
-
Object
- Object
- CapistranoMulticonfigParallel::RakeWorker
- Includes:
- BaseActorHelper
- Defined in:
- lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb
Overview
class that handles the rake task and waits for approval from the celluloid worker
Instance Attribute Summary collapse
-
#action ⇒ Object
readonly
Returns the value of attribute action.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#job_id ⇒ Object
readonly
Returns the value of attribute job_id.
-
#publisher_channel ⇒ Object
readonly
Returns the value of attribute publisher_channel.
-
#stdin_result ⇒ Object
readonly
Returns the value of attribute stdin_result.
-
#subscription_channel ⇒ Object
readonly
Returns the value of attribute subscription_channel.
-
#successfull_subscription ⇒ Object
readonly
Returns the value of attribute successfull_subscription.
-
#task ⇒ Object
readonly
Returns the value of attribute task.
-
#task_approved ⇒ Object
readonly
Returns the value of attribute task_approved.
Instance Method Summary collapse
- #custom_attributes ⇒ Object
- #default_settings ⇒ Object
- #initialize_subscription ⇒ Object
- #on_close(code, reason) ⇒ Object
- #on_message(message) ⇒ Object
- #publish_new_work(new_options = {}) ⇒ Object
- #publish_subscription_successfull(message) ⇒ Object
- #publish_to_worker(data) ⇒ Object
- #stdin_approval(message) ⇒ Object
- #task_approval(message) ⇒ Object
- #task_data ⇒ Object
- #task_name ⇒ Object
- #user_prompt_needed?(data) ⇒ Boolean
- #wait_execution(name = task_name, time = 0.1) ⇒ Object
- #wait_for(_name, time) ⇒ Object
- #wait_for_stdin_input ⇒ Object
- #work(options = {}) ⇒ Object
Methods included from BaseActorHelper
Instance Attribute Details
#action ⇒ Object (readonly)
Returns the value of attribute action.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def action @action end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def client @client end |
#job_id ⇒ Object (readonly)
Returns the value of attribute job_id.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def job_id @job_id end |
#publisher_channel ⇒ Object (readonly)
Returns the value of attribute publisher_channel.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def publisher_channel @publisher_channel end |
#stdin_result ⇒ Object (readonly)
Returns the value of attribute stdin_result.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def stdin_result @stdin_result end |
#subscription_channel ⇒ Object (readonly)
Returns the value of attribute subscription_channel.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def subscription_channel @subscription_channel end |
#successfull_subscription ⇒ Object (readonly)
Returns the value of attribute successfull_subscription.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def successfull_subscription @successfull_subscription end |
#task ⇒ Object (readonly)
Returns the value of attribute task.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def task @task end |
#task_approved ⇒ Object (readonly)
Returns the value of attribute task_approved.
7 8 9 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 7 def task_approved @task_approved end |
Instance Method Details
#custom_attributes ⇒ Object
18 19 20 21 22 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 18 def custom_attributes @publisher_channel = "worker_#{@job_id}" @action = 'invoke' @task = ['task'] end |
#default_settings ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 41 def default_settings @stdin_result = nil @job_id = ['job_id'] @subscription_channel = ['actor_id'] @task_approved = false @successfull_subscription = false end |
#initialize_subscription ⇒ Object
49 50 51 52 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 49 def initialize_subscription return if defined?(@client) && @client.present? @client = CelluloidPubsub::Client.new(actor: Actor.current, enable_debug: debug_websocket?, channel: @subscription_channel, log_file_path: websocket_config.fetch('log_file_path', nil)) end |
#on_close(code, reason) ⇒ Object
116 117 118 119 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 116 def on_close(code, reason) log_to_file("websocket connection closed: #{code.inspect}, #{reason.inspect}") terminate end |
#on_message(message) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 70 def () return unless .present? log_to_file("Rake worker #{@job_id} received after on message:", ) if @client.succesfull_subscription?() publish_subscription_successfull() elsif msg_for_task?() task_approval() elsif msg_for_stdin?() stdin_approval() else show_warning "unknown message: #{message.inspect}" end end |
#publish_new_work(new_options = {}) ⇒ Object
24 25 26 27 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 24 def publish_new_work( = {}) work(.merge()) publish_to_worker(task_data) end |
#publish_subscription_successfull(message) ⇒ Object
84 85 86 87 88 89 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 84 def publish_subscription_successfull() return unless @client.succesfull_subscription?() log_to_file("Rake worker #{@job_id} received after publish_subscription_successfull:", ) @successfull_subscription = true publish_to_worker(task_data) end |
#publish_to_worker(data) ⇒ Object
66 67 68 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 66 def publish_to_worker(data) @client.publish(@publisher_channel, data) end |
#stdin_approval(message) ⇒ Object
98 99 100 101 102 103 104 105 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 98 def stdin_approval() return unless msg_for_stdin?() if @job_id == ['job_id'] @stdin_result = .fetch('result', '') else show_warning "unknown stdin_approval #{message.inspect}" end end |
#task_approval(message) ⇒ Object
107 108 109 110 111 112 113 114 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 107 def task_approval() return unless msg_for_task?() if @job_id == ['job_id'] && ['task'].to_s == task_name.to_s && ['approved'] == 'yes' @task_approved = true else show_warning "unknown task_approval #{message.inspect} #{task_data}" end end |
#task_data ⇒ Object
58 59 60 61 62 63 64 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 58 def task_data { action: @action, task: task_name, job_id: @job_id } end |
#task_name ⇒ Object
54 55 56 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 54 def task_name @task.respond_to?(:name) ? @task.name : @task end |
#user_prompt_needed?(data) ⇒ Boolean
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 121 def user_prompt_needed?(data) question, default = get_question_details(data) log_to_file("Rake worker #{@job_id} tries to determine question #{data.inspect} #{question.inspect} #{default.inspect}") return if question.blank? || @action != 'invoke' publish_to_worker(action: 'stdout', question: question, default: default.present? ? default.delete('()') : '', job_id: @job_id) wait_for_stdin_input end |
#wait_execution(name = task_name, time = 0.1) ⇒ Object
29 30 31 32 33 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 29 def wait_execution(name = task_name, time = 0.1) # info "Before waiting #{name}" Actor.current.wait_for(name, time) # info "After waiting #{name}" end |
#wait_for(_name, time) ⇒ Object
35 36 37 38 39 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 35 def wait_for(_name, time) # info "waiting for #{time} seconds on #{name}" sleep time # info "done waiting on #{name} " end |
#wait_for_stdin_input ⇒ Object
91 92 93 94 95 96 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 91 def wait_for_stdin_input wait_execution until @stdin_result.present? output = @stdin_result.clone @stdin_result = nil output end |
#work(options = {}) ⇒ Object
11 12 13 14 15 16 |
# File 'lib/capistrano_multiconfig_parallel/celluloid/rake_worker.rb', line 11 def work( = {}) = .stringify_keys default_settings custom_attributes initialize_subscription end |