Class: ReceptorController::Client::DirectiveBlocking

Inherits:
Directive
  • Object
show all
Defined in:
lib/receptor_controller/client/directive_blocking.rb

Overview

Blocking directive for requests through POST /job Successful POST causes locking current thread until response from Kafka comes

Raises kind of ReceptorController::Client::Error in case of problems/timeout

Instance Attribute Summary

Attributes inherited from Directive

#account, #client, #log_message_common, #name, #node_id, #payload

Instance Method Summary collapse

Methods inherited from Directive

#default_body

Constructor Details

#initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) ⇒ DirectiveBlocking

Returns a new instance of DirectiveBlocking.



9
10
11
12
13
14
15
# File 'lib/receptor_controller/client/directive_blocking.rb', line 9

def initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil)
  super
  self.response_lock = Mutex.new
  self.response_waiting = ConditionVariable.new
  self.response_data = nil
  self.response_exception = nil
end

Instance Method Details

#call(body = default_body) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/receptor_controller/client/directive_blocking.rb', line 17

def call(body = default_body)
  @url = JSON.parse(body[:payload])['url']
  response = connection.post(config.job_path, body.to_json)

  msg_id = JSON.parse(response.body)['id']

  logger.debug("Receptor response [#{ReceptorController::Client::Configuration.default.queue_persist_ref}]: registering message #{msg_id}, href_slug: #{log_message_common}")
  # registers message id for kafka responses
  response_worker.register_message(msg_id, self)
  wait_for_response(msg_id)
rescue Faraday::Error => e
  msg = receptor_log_msg("Directive #{name} failed (#{log_message_common}) [MSG: #{msg_id}]", , node_id, e)
  raise ReceptorController::Client::ControllerResponseError.new(msg)
end

#response_error(msg_id, response_code, err_message) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/receptor_controller/client/directive_blocking.rb', line 56

def response_error(msg_id, response_code, err_message)
  response_lock.synchronize do
    self.response_data = nil
    self.response_exception = ReceptorController::Client::ResponseError.new("#{err_message} (code: #{response_code}) (#{log_message_common}) [MSG: #{msg_id}]")
    response_waiting.signal
  end
end

#response_success(msg_id, message_type, response) ⇒ Object

TODO: Review when future plugins with more “response” messages come



43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/receptor_controller/client/directive_blocking.rb', line 43

def response_success(msg_id, message_type, response)
  response_lock.synchronize do
    if message_type == MESSAGE_TYPE_RESPONSE
      self.response_data = response
    elsif message_type == MESSAGE_TYPE_EOF
      response_waiting.signal
    else
      self.response_exception = ReceptorController::Client::UnknownResponseTypeError.new("#{log_message_common}[MSG: #{msg_id}]")
      response_waiting.signal
    end
  end
end

#response_timeout(msg_id) ⇒ Object



64
65
66
67
68
69
70
# File 'lib/receptor_controller/client/directive_blocking.rb', line 64

def response_timeout(msg_id)
  response_lock.synchronize do
    self.response_data = nil
    self.response_exception = ReceptorController::Client::ResponseTimeoutError.new("Timeout (#{log_message_common}) [MSG: #{msg_id}]")
    response_waiting.signal
  end
end

#wait_for_response(_msg_id) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/receptor_controller/client/directive_blocking.rb', line 32

def wait_for_response(_msg_id)
  response_lock.synchronize do
    response_waiting.wait(response_lock)

    raise response_exception if response_failed?

    response_data.dup
  end
end