Class: ReceptorController::Client::DirectiveNonBlocking

Inherits:
Directive
  • Object
show all
Defined in:
lib/receptor_controller/client/directive_non_blocking.rb

Overview

Non-blocking directive for requests through POST /job Directive’s call returns either message ID or nil

Callback blocks can be specified for handling responses @example:

receiver = <object with methods below>
directive
  .on_success do |msg_id, response|
    receiver.process_response(msg_id, response)
  end
  .on_error do |msg_id, code, response|
    receiver.process_error(msg_id, code, response)
  end
  .on_timeout do |msg_id|
    receiver.process_timeout(msg_id)
  end
  .on_eof do |msg_id|
    receiver.process_eof(msg_id)
  end
  .on_eof do |msg_id|
    logger.debug("[#{msg_id}] EOF message received")
  end

directive.call

Instance Attribute Summary

Attributes inherited from Directive

#account, #client, #log_message_common, #name, #node_id, #payload

Instance Method Summary collapse

Methods inherited from Directive

#default_body

Constructor Details

#initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil) ⇒ DirectiveNonBlocking

Returns a new instance of DirectiveNonBlocking.



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 30

def initialize(name:, account:, node_id:, payload:, client:, log_message_common: nil)
  super

  @success_callbacks = []
  @eof_callbacks     = []
  @timeout_callbacks = []
  @error_callbacks   = []

  @responses_count = Concurrent::AtomicFixnum.new
  @eof_lock        = Mutex.new
  @eof_wait        = ConditionVariable.new
end

Instance Method Details

#call(body = default_body) ⇒ Object

Entrypoint for request



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 44

def call(body = default_body)
  response = Faraday.post(config.job_url, body.to_json, client.headers)
  if response.success?
    msg_id = JSON.parse(response.body)['id']

    # registers message id for kafka responses
    response_worker.register_message(msg_id, self)
    logger.debug("Receptor response [#{ReceptorController::Client::Configuration.default.queue_persist_ref}]: registering message #{msg_id}, href_slug: #{log_message_common}")

    msg_id
  else
    logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}): HTTP #{response.status}", , node_id))
    nil
  end
rescue Faraday::Error => e
  logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common}). POST /job error", , node_id, e))
  nil
rescue => e
  logger.error(receptor_log_msg("Directive #{name} failed (#{log_message_common})", , node_id, e))
  nil
end

#on_eof(&block) ⇒ Object



71
72
73
74
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 71

def on_eof(&block)
  @eof_callbacks << block if block_given?
  self
end

#on_error(&block) ⇒ Object



81
82
83
84
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 81

def on_error(&block)
  @error_callbacks << block if block_given?
  self
end

#on_success(&block) ⇒ Object



66
67
68
69
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 66

def on_success(&block)
  @success_callbacks << block if block_given?
  self
end

#on_timeout(&block) ⇒ Object



76
77
78
79
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 76

def on_timeout(&block)
  @timeout_callbacks << block if block_given?
  self
end

#response_error(msg_id, response_code, response) ⇒ Object

Handles error responses in Threads EOF processing waits until all threads are finished



102
103
104
105
106
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 102

def response_error(msg_id, response_code, response)
  response_thread do
    @error_callbacks.each { |block| block.call(msg_id, response_code, response) }
  end
end

#response_success(msg_id, message_type, response) ⇒ Object

Handles successful responses in Threads EOF processing waits until all response threads are finished



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 88

def response_success(msg_id, message_type, response)
  if message_type == MESSAGE_TYPE_EOF
    eof_thread do
      @eof_callbacks.each { |block| block.call(msg_id) }
    end
  else
    response_thread do
      @success_callbacks.each { |block| block.call(msg_id, response) }
    end
  end
end

#response_timeout(msg_id) ⇒ Object

Error state: Any response wasn’t received in ‘Configuration.response_timeout`



109
110
111
112
113
# File 'lib/receptor_controller/client/directive_non_blocking.rb', line 109

def response_timeout(msg_id)
  response_thread do
    @timeout_callbacks.each { |block| block.call(msg_id) }
  end
end