Class: ReceptorController::Client::DirectiveNonBlocking
- Defined in:
- lib/receptor_controller/client/directive_non_blocking.rb
Overview
Non-blocking directive for requests through POST /job Directive’s call returns either message ID or nil
Callback blocks can be specified for handling responses @example:
receiver = <object with methods below>
directive
.on_success do |msg_id, response|
receiver.process_response(msg_id, response)
end
.on_error do |msg_id, code, response|
receiver.process_error(msg_id, code, response)
end
.on_timeout do |msg_id|
receiver.process_timeout(msg_id)
end
.on_eof do |msg_id|
receiver.process_eof(msg_id)
end
.on_eof do |msg_id|
logger.debug("[#{msg_id}] EOF message received")
end
directive.call
Instance Attribute Summary
Attributes inherited from Directive
#account, #client, #log_message_common, #name, #node_id, #payload
Instance Method Summary collapse
-
#call(body = default_body) ⇒ Object
Entrypoint for request.
-
#initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) ⇒ DirectiveNonBlocking
constructor
A new instance of DirectiveNonBlocking.
- #on_eof(&block) ⇒ Object
- #on_error(&block) ⇒ Object
- #on_success(&block) ⇒ Object
- #on_timeout(&block) ⇒ Object
-
#response_error(msg_id, response_code, response) ⇒ Object
Handles error responses in Threads EOF processing waits until all threads are finished.
-
#response_success(msg_id, message_type, response) ⇒ Object
Handles successful responses in Threads EOF processing waits until all response threads are finished.
-
#response_timeout(msg_id) ⇒ Object
Error state: Any response wasn’t received in ‘Configuration.response_timeout`.
Methods inherited from Directive
Constructor Details
#initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) ⇒ DirectiveNonBlocking
Returns a new instance of DirectiveNonBlocking.
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 30 def initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) super @success_callbacks = [] @eof_callbacks = [] @timeout_callbacks = [] @error_callbacks = [] @responses_count = Concurrent::AtomicFixnum.new @eof_lock = Mutex.new @eof_wait = ConditionVariable.new end |
Instance Method Details
#call(body = default_body) ⇒ Object
Entrypoint for request
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 44 def call(body = default_body) response = Faraday.post(config.job_url, body.to_json, client.headers) if response.success? msg_id = JSON.parse(response.body)['id'] # registers message id for kafka responses response_worker.(msg_id, self) logger.debug("Receptor response [#{ReceptorController::Client::Configuration.default.queue_persist_ref}]: registering message #{msg_id}, href_slug: #{log_message_common}") msg_id else logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}): HTTP #{response.status}", account, node_id)) nil end rescue Faraday::Error => e logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}). POST /job error", account, node_id, e)) nil rescue => e logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common})", account, node_id, e)) nil end |
#on_eof(&block) ⇒ Object
71 72 73 74 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 71 def on_eof(&block) @eof_callbacks << block if block_given? self end |
#on_error(&block) ⇒ Object
81 82 83 84 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 81 def on_error(&block) @error_callbacks << block if block_given? self end |
#on_success(&block) ⇒ Object
66 67 68 69 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 66 def on_success(&block) @success_callbacks << block if block_given? self end |
#on_timeout(&block) ⇒ Object
76 77 78 79 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 76 def on_timeout(&block) @timeout_callbacks << block if block_given? self end |
#response_error(msg_id, response_code, response) ⇒ Object
Handles error responses in Threads EOF processing waits until all threads are finished
102 103 104 105 106 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 102 def response_error(msg_id, response_code, response) response_thread do @error_callbacks.each { |block| block.call(msg_id, response_code, response) } end end |
#response_success(msg_id, message_type, response) ⇒ Object
Handles successful responses in Threads EOF processing waits until all response threads are finished
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 88 def response_success(msg_id, , response) if == MESSAGE_TYPE_EOF eof_thread do @eof_callbacks.each { |block| block.call(msg_id) } end else response_thread do @success_callbacks.each { |block| block.call(msg_id, response) } end end end |
#response_timeout(msg_id) ⇒ Object
Error state: Any response wasn’t received in ‘Configuration.response_timeout`
109 110 111 112 113 |
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 109 def response_timeout(msg_id) response_thread do @timeout_callbacks.each { |block| block.call(msg_id) } end end |