Class: ReceptorController::Client::ResponseWorker
- Inherits:
-
Object
- Object
- ReceptorController::Client::ResponseWorker
- Defined in:
- lib/receptor_controller/client/response_worker.rb
Overview
ResponseWorker is listening on Kafka topic platform.receptor-controller.responses (@see Configuration.queue_topic) It asynchronously receives responses requested by POST /job to receptor controller. Request and response is paired by message ID (response of POST /job and ‘in_response_to’ value in kafka response here)
Successful responses are at least two:
-
1+ of ‘response’ type, containing data
-
1 of ‘eof’ type, signalizing end of transmission
Registered messages without response are removed after timeout (Configuration.response_timeout)
All type of responses/timeout can be sent to registered callbacks (@see :register_message)
Use “start” and “stop” methods to start/stop listening on Kafka
Constant Summary collapse
- EOF =
"eof".freeze
- RESPONSE =
"response".freeze
- INITIALIZATION_TIMEOUT =
20
Instance Attribute Summary collapse
-
#received_messages ⇒ Object
Returns the value of attribute received_messages.
-
#started ⇒ Object
(also: #started?)
readonly
Returns the value of attribute started.
Instance Method Summary collapse
-
#initialize(config, logger) ⇒ ResponseWorker
constructor
A new instance of ResponseWorker.
-
#register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) ⇒ Object
Registers message_id received by request, Defines response and timeout callback methods.
-
#start ⇒ Object
Start listening on Kafka.
-
#stop ⇒ Object
Stop listener.
Constructor Details
#initialize(config, logger) ⇒ ResponseWorker
Returns a new instance of ResponseWorker.
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/receptor_controller/client/response_worker.rb', line 31 def initialize(config, logger) self.config = config self.lock = Mutex.new self.timeout_lock = Mutex.new self.logger = logger self. = Concurrent::Map.new self. = Concurrent::Array.new self.started = Concurrent::AtomicBoolean.new(false) self.workers = {} end |
Instance Attribute Details
#received_messages ⇒ Object
Returns the value of attribute received_messages.
29 30 31 |
# File 'lib/receptor_controller/client/response_worker.rb', line 29 def @received_messages end |
#started ⇒ Object Also known as: started?
Returns the value of attribute started.
26 27 28 |
# File 'lib/receptor_controller/client/response_worker.rb', line 26 def started @started end |
Instance Method Details
#register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) ⇒ Object
Registers message_id received by request, Defines response and timeout callback methods
78 79 80 81 82 83 84 |
# File 'lib/receptor_controller/client/response_worker.rb', line 78 def (msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) [msg_id] = {:receiver => receiver, :response_callback => response_callback, :timeout_callback => timeout_callback, :error_callback => error_callback, :last_checked_at => Time.now.utc} end |
#start ⇒ Object
Start listening on Kafka
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/receptor_controller/client/response_worker.rb', line 43 def start init_lock, init_wait = Mutex.new, ConditionVariable.new lock.synchronize do return if started.value default_messaging_opts # Thread-safe init started.value = true workers[:maintenance] = Thread.new { check_timeouts while started.value } workers[:listener] = Thread.new { listen while started.value } end logger.info("Receptor Response worker started...") end |
#stop ⇒ Object
Stop listener
60 61 62 63 64 65 66 67 68 |
# File 'lib/receptor_controller/client/response_worker.rb', line 60 def stop lock.synchronize do return unless started.value started.value = false workers[:listener]&.terminate workers[:maintenance]&.join end end |