Class: ReceptorController::Client::ResponseWorker
- Inherits:
-
Object
- Object
- ReceptorController::Client::ResponseWorker
- Defined in:
- lib/receptor_controller/client/response_worker.rb
Overview
ResponseWorker is listening on Kafka topic platform.receptor-controller.responses (@see Configuration.queue_topic) It asynchronously receives responses requested by POST /job to receptor controller. Request and response is paired by message ID (response of POST /job and ‘in_response_to’ value in kafka response here)
Successful responses are at least two:
-
1+ of ‘response’ type, containing data
-
1 of ‘eof’ type, signalizing end of transmission
Registered messages without response are removed after timeout (Configuration.response_timeout)
All type of responses/timeout can be sent to registered callbacks (@see :register_message)
Use “start” and “stop” methods to start/stop listening on Kafka
Constant Summary collapse
- EOF =
"eof".freeze
- RESPONSE =
"response".freeze
- INITIALIZATION_TIMEOUT =
20
Instance Attribute Summary collapse
-
#received_messages ⇒ Object
Returns the value of attribute received_messages.
-
#started ⇒ Object
(also: #started?)
readonly
Returns the value of attribute started.
Instance Method Summary collapse
-
#init_notifications(init_lock, init_wait) ⇒ Object
Listening for the consumer call to the Fetch API Then releasing original starting thread.
-
#initialize(config, logger) ⇒ ResponseWorker
constructor
A new instance of ResponseWorker.
-
#register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) ⇒ Object
Registers message_id received by request, Defines response and timeout callback methods.
-
#start ⇒ Object
Start listening on Kafka.
-
#stop ⇒ Object
Stop listener.
- #wait_for_notifications(init_lock, init_wait) ⇒ Object
Constructor Details
#initialize(config, logger) ⇒ ResponseWorker
Returns a new instance of ResponseWorker.
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/receptor_controller/client/response_worker.rb', line 31 def initialize(config, logger) self.config = config self.lock = Mutex.new self.timeout_lock = Mutex.new self.logger = logger self. = Concurrent::Map.new self. = Concurrent::Array.new self.started = Concurrent::AtomicBoolean.new(false) self.workers = {} end |
Instance Attribute Details
#received_messages ⇒ Object
Returns the value of attribute received_messages.
29 30 31 |
# File 'lib/receptor_controller/client/response_worker.rb', line 29 def @received_messages end |
#started ⇒ Object Also known as: started?
Returns the value of attribute started.
26 27 28 |
# File 'lib/receptor_controller/client/response_worker.rb', line 26 def started @started end |
Instance Method Details
#init_notifications(init_lock, init_wait) ⇒ Object
Listening for the consumer call to the Fetch API Then releasing original starting thread
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/receptor_controller/client/response_worker.rb', line 76 def init_notifications(init_lock, init_wait) logger.info("Initializing Receptor kafka client...") start_time = Time.now.utc ActiveSupport::Notifications.subscribe('request.connection.kafka') do |*args| event = ActiveSupport::Notifications::Event.new(*args) if default_messaging_opts[:client_ref] == event.payload[:client_id] if event.payload[:api] == :fetch logger.info "Receptor Kafka client successfully initialized " # initialized, unsubscribe notifications ActiveSupport::Notifications.unsubscribe('request.connection.kafka') init_lock.synchronize { init_wait.signal } end end if start_time <= INITIALIZATION_TIMEOUT.seconds.ago.utc ActiveSupport::Notifications.unsubscribe('request.connection.kafka') logger.error("Receptor Kafka client initialization timeout...") init_lock.synchronize { init_wait.signal } end end end |
#register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) ⇒ Object
Registers message_id received by request, Defines response and timeout callback methods
112 113 114 115 116 117 118 |
# File 'lib/receptor_controller/client/response_worker.rb', line 112 def (msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) [msg_id] = {:receiver => receiver, :response_callback => response_callback, :timeout_callback => timeout_callback, :error_callback => error_callback, :last_checked_at => Time.now.utc} end |
#start ⇒ Object
Start listening on Kafka
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/receptor_controller/client/response_worker.rb', line 43 def start init_lock, init_wait = Mutex.new, ConditionVariable.new lock.synchronize do return if started.value default_messaging_opts # Thread-safe init init_notifications(init_lock, init_wait) started.value = true workers[:maintenance] = Thread.new { check_timeouts while started.value } workers[:listener] = Thread.new { listen while started.value } end # Wait for kafka initialization wait_for_notifications(init_lock, init_wait) logger.info("Receptor Response worker started...") end |
#stop ⇒ Object
Stop listener
64 65 66 67 68 69 70 71 72 |
# File 'lib/receptor_controller/client/response_worker.rb', line 64 def stop lock.synchronize do return unless started.value started.value = false workers[:listener]&.terminate workers[:maintenance]&.join end end |
#wait_for_notifications(init_lock, init_wait) ⇒ Object
98 99 100 101 102 |
# File 'lib/receptor_controller/client/response_worker.rb', line 98 def wait_for_notifications(init_lock, init_wait) init_lock.synchronize do init_wait.wait(init_lock) end end |