Class: ReceptorController::Client::ResponseWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/receptor_controller/client/response_worker.rb

Overview

ResponseWorker is listening on Kafka topic platform.receptor-controller.responses (@see Configuration.queue_topic) It asynchronously receives responses requested by POST /job to receptor controller. Request and response is paired by message ID (response of POST /job and ‘in_response_to’ value in kafka response here)

Successful responses are at least two:

  • 1+ of ‘response’ type, containing data

  • 1 of ‘eof’ type, signalizing end of transmission

Registered messages without response are removed after timeout (Configuration.response_timeout)

All type of responses/timeout can be sent to registered callbacks (@see :register_message)

Use “start” and “stop” methods to start/stop listening on Kafka

Constant Summary collapse

EOF =
"eof".freeze
RESPONSE =
"response".freeze
INITIALIZATION_TIMEOUT =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, logger) ⇒ ResponseWorker

Returns a new instance of ResponseWorker.



31
32
33
34
35
36
37
38
39
40
# File 'lib/receptor_controller/client/response_worker.rb', line 31

def initialize(config, logger)
  self.config              = config
  self.lock                = Mutex.new
  self.timeout_lock        = Mutex.new
  self.logger              = logger
  self.registered_messages = Concurrent::Map.new
  self.received_messages   = Concurrent::Array.new
  self.started             = Concurrent::AtomicBoolean.new(false)
  self.workers             = {}
end

Instance Attribute Details

#received_messagesObject

Returns the value of attribute received_messages.



29
30
31
# File 'lib/receptor_controller/client/response_worker.rb', line 29

def received_messages
  @received_messages
end

#startedObject Also known as: started?

Returns the value of attribute started.



26
27
28
# File 'lib/receptor_controller/client/response_worker.rb', line 26

def started
  @started
end

Instance Method Details

#register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) ⇒ Object

Registers message_id received by request, Defines response and timeout callback methods

Parameters:

  • msg_id (String)

    UUID

  • receiver (Object)

    any object implementing callbacks

  • response_callback (Symbol) (defaults to: :response_success)

    name of receiver’s method processing responses

  • timeout_callback (Symbol) (defaults to: :response_timeout)

    name of receiver’s method processing timeout [optional]

  • error_callback (Symbol) (defaults to: :response_error)

    name of receiver’s method processing errors [optional]



78
79
80
81
82
83
84
# File 'lib/receptor_controller/client/response_worker.rb', line 78

def register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error)
  registered_messages[msg_id] = {:receiver          => receiver,
                                 :response_callback => response_callback,
                                 :timeout_callback  => timeout_callback,
                                 :error_callback    => error_callback,
                                 :last_checked_at   => Time.now.utc}
end

#startObject

Start listening on Kafka



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/receptor_controller/client/response_worker.rb', line 43

def start
  init_lock, init_wait = Mutex.new, ConditionVariable.new

  lock.synchronize do
    return if started.value

    default_messaging_opts # Thread-safe init

    started.value         = true
    workers[:maintenance] = Thread.new { check_timeouts while started.value }
    workers[:listener]    = Thread.new { listen while started.value }
  end

  logger.info("Receptor Response worker started...")
end

#stopObject

Stop listener



60
61
62
63
64
65
66
67
68
# File 'lib/receptor_controller/client/response_worker.rb', line 60

def stop
  lock.synchronize do
    return unless started.value

    started.value = false
    workers[:listener]&.terminate
    workers[:maintenance]&.join
  end
end