Class: ReceptorController::Client::DirectiveBlocking
- Defined in:
- lib/receptor_controller/client/directive_blocking.rb
Overview
Blocking directive for requests through POST /job Successful POST causes locking current thread until response from Kafka comes
Raises kind of ReceptorController::Client::Error in case of problems/timeout
Instance Attribute Summary
Attributes inherited from Directive
#account, #client, #log_message_common, #name, #node_id, #payload
Instance Method Summary collapse
- #call(body = default_body) ⇒ Object
-
#initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) ⇒ DirectiveBlocking
constructor
A new instance of DirectiveBlocking.
- #response_error(msg_id, response_code, err_message) ⇒ Object
-
#response_success(msg_id, message_type, response) ⇒ Object
TODO: Review when future plugins with more “response” messages come.
- #response_timeout(msg_id) ⇒ Object
- #wait_for_response(_msg_id) ⇒ Object
Methods inherited from Directive
Constructor Details
#initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) ⇒ DirectiveBlocking
Returns a new instance of DirectiveBlocking.
9 10 11 12 13 14 15 |
# File 'lib/receptor_controller/client/directive_blocking.rb', line 9 def initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) super self.response_lock = Mutex.new self.response_waiting = ConditionVariable.new self.response_data = nil self.response_exception = nil end |
Instance Method Details
#call(body = default_body) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/receptor_controller/client/directive_blocking.rb', line 17 def call(body = default_body) @url = JSON.parse(body[:payload])['url'] response = connection.post(config.job_path, body.to_json) msg_id = JSON.parse(response.body)['id'] logger.debug("Receptor response [#{ReceptorController::Client::Configuration.default.queue_persist_ref}]: registering message #{msg_id}, href_slug: #{log_message_common}") # registers message id for kafka responses response_worker.(msg_id, self) wait_for_response(msg_id) rescue Faraday::Error => e msg = receptor_log_msg("Directive #{name} failed (#{log_message_common}) [MSG: #{msg_id}]", account, node_id, e) raise ReceptorController::Client::ControllerResponseError.new(msg) end |
#response_error(msg_id, response_code, err_message) ⇒ Object
56 57 58 59 60 61 62 |
# File 'lib/receptor_controller/client/directive_blocking.rb', line 56 def response_error(msg_id, response_code, ) response_lock.synchronize do self.response_data = nil self.response_exception = ReceptorController::Client::ResponseError.new("#{err_message} (code: #{response_code}) (#{log_message_common}) [MSG: #{msg_id}]") response_waiting.signal end end |
#response_success(msg_id, message_type, response) ⇒ Object
TODO: Review when future plugins with more “response” messages come
43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/receptor_controller/client/directive_blocking.rb', line 43 def response_success(msg_id, , response) response_lock.synchronize do if == MESSAGE_TYPE_RESPONSE self.response_data = response elsif == MESSAGE_TYPE_EOF response_waiting.signal else self.response_exception = ReceptorController::Client::UnknownResponseTypeError.new("#{log_message_common}[MSG: #{msg_id}]") response_waiting.signal end end end |
#response_timeout(msg_id) ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/receptor_controller/client/directive_blocking.rb', line 64 def response_timeout(msg_id) response_lock.synchronize do self.response_data = nil self.response_exception = ReceptorController::Client::ResponseTimeoutError.new("Timeout (#{log_message_common}) [MSG: #{msg_id}]") response_waiting.signal end end |
#wait_for_response(_msg_id) ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/receptor_controller/client/directive_blocking.rb', line 32 def wait_for_response(_msg_id) response_lock.synchronize do response_waiting.wait(response_lock) raise response_exception if response_failed? response_data.dup end end |