Class: ReceptorController::Client::ResponseWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/receptor_controller/client/response_worker.rb

Overview

ResponseWorker is listening on Kafka topic platform.receptor-controller.responses (@see Configuration.queue_topic) It asynchronously receives responses requested by POST /job to receptor controller. Request and response is paired by message ID (response of POST /job and ‘in_response_to’ value in kafka response here)

Successful responses are at least two:

  • 1+ of ‘response’ type, containing data

  • 1 of ‘eof’ type, signalizing end of transmission

Registered messages without response are removed after timeout (Configuration.response_timeout)

All type of responses/timeout can be sent to registered callbacks (@see :register_message)

Use “start” and “stop” methods to start/stop listening on Kafka

Constant Summary collapse

EOF =
"eof".freeze
RESPONSE =
"response".freeze
INITIALIZATION_TIMEOUT =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, logger) ⇒ ResponseWorker

Returns a new instance of ResponseWorker.



31
32
33
34
35
36
37
38
39
40
# File 'lib/receptor_controller/client/response_worker.rb', line 31

def initialize(config, logger)
  self.config              = config
  self.lock                = Mutex.new
  self.timeout_lock        = Mutex.new
  self.logger              = logger
  self.registered_messages = Concurrent::Map.new
  self.received_messages   = Concurrent::Array.new
  self.started             = Concurrent::AtomicBoolean.new(false)
  self.workers             = {}
end

Instance Attribute Details

#received_messagesObject

Returns the value of attribute received_messages.



29
30
31
# File 'lib/receptor_controller/client/response_worker.rb', line 29

def received_messages
  @received_messages
end

#startedObject Also known as: started?

Returns the value of attribute started.



26
27
28
# File 'lib/receptor_controller/client/response_worker.rb', line 26

def started
  @started
end

Instance Method Details

#init_notifications(init_lock, init_wait) ⇒ Object

Listening for the consumer call to the Fetch API Then releasing original starting thread



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/receptor_controller/client/response_worker.rb', line 76

def init_notifications(init_lock, init_wait)
  logger.info("Initializing Receptor kafka client...")
  start_time = Time.now.utc
  ActiveSupport::Notifications.subscribe('request.connection.kafka') do |*args|
    event = ActiveSupport::Notifications::Event.new(*args)
    if default_messaging_opts[:client_ref] == event.payload[:client_id]
      if event.payload[:api] == :fetch
        logger.info "Receptor Kafka client successfully initialized "
        # initialized, unsubscribe notifications
        ActiveSupport::Notifications.unsubscribe('request.connection.kafka')
        init_lock.synchronize { init_wait.signal }
      end
    end

    if start_time <= INITIALIZATION_TIMEOUT.seconds.ago.utc
      ActiveSupport::Notifications.unsubscribe('request.connection.kafka')
      logger.error("Receptor Kafka client initialization timeout...")
      init_lock.synchronize { init_wait.signal }
    end
  end
end

#register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error) ⇒ Object

Registers message_id received by request, Defines response and timeout callback methods

Parameters:

  • msg_id (String)

    UUID

  • receiver (Object)

    any object implementing callbacks

  • response_callback (Symbol) (defaults to: :response_success)

    name of receiver’s method processing responses

  • timeout_callback (Symbol) (defaults to: :response_timeout)

    name of receiver’s method processing timeout [optional]

  • error_callback (Symbol) (defaults to: :response_error)

    name of receiver’s method processing errors [optional]



112
113
114
115
116
117
118
# File 'lib/receptor_controller/client/response_worker.rb', line 112

def register_message(msg_id, receiver, response_callback: :response_success, timeout_callback: :response_timeout, error_callback: :response_error)
  registered_messages[msg_id] = {:receiver          => receiver,
                                 :response_callback => response_callback,
                                 :timeout_callback  => timeout_callback,
                                 :error_callback    => error_callback,
                                 :last_checked_at   => Time.now.utc}
end

#startObject

Start listening on Kafka



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/receptor_controller/client/response_worker.rb', line 43

def start
  init_lock, init_wait = Mutex.new, ConditionVariable.new

  lock.synchronize do
    return if started.value

    default_messaging_opts # Thread-safe init
    init_notifications(init_lock, init_wait)

    started.value         = true
    workers[:maintenance] = Thread.new { check_timeouts while started.value }
    workers[:listener]    = Thread.new { listen while started.value }
  end

  # Wait for kafka initialization
  wait_for_notifications(init_lock, init_wait)

  logger.info("Receptor Response worker started...")
end

#stopObject

Stop listener



64
65
66
67
68
69
70
71
72
# File 'lib/receptor_controller/client/response_worker.rb', line 64

def stop
  lock.synchronize do
    return unless started.value

    started.value = false
    workers[:listener]&.terminate
    workers[:maintenance]&.join
  end
end

#wait_for_notifications(init_lock, init_wait) ⇒ Object



98
99
100
101
102
# File 'lib/receptor_controller/client/response_worker.rb', line 98

def wait_for_notifications(init_lock, init_wait)
  init_lock.synchronize do
    init_wait.wait(init_lock)
  end
end