Class: RailwayIpc::Client

Inherits:
Object
  • Object
show all
Extended by:
RPC::ErrorAdapterConfigurable, RPC::PublishLocationConfigurable
Defined in:
lib/railway_ipc/rpc/client/client.rb,
lib/railway_ipc/rpc/client/errors/timeout_error.rb

Defined Under Namespace

Classes: TimeoutError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RPC::PublishLocationConfigurable

exchange_name, publish_to

Methods included from RPC::ErrorAdapterConfigurable

rpc_error_adapter, rpc_error_adapter_class

Constructor Details

#initialize(request_message, opts = { automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) ⇒ Client

Returns a new instance of Client.



24
25
26
27
# File 'lib/railway_ipc/rpc/client/client.rb', line 24

def initialize(request_message, opts={ automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter)
  @rabbit_connection = rabbit_adapter.new(exchange_name: self.class.exchange_name, options: opts)
  @request_message = request_message
end

Instance Attribute Details

#messageObject (readonly)

Returns the value of attribute message.



11
12
13
# File 'lib/railway_ipc/rpc/client/client.rb', line 11

def message
  @message
end

#rabbit_connectionObject (readonly)

Returns the value of attribute rabbit_connection.



11
12
13
# File 'lib/railway_ipc/rpc/client/client.rb', line 11

def rabbit_connection
  @rabbit_connection
end

#request_messageObject

Returns the value of attribute request_message.



10
11
12
# File 'lib/railway_ipc/rpc/client/client.rb', line 10

def request_message
  @request_message
end

#response_messageObject

Returns the value of attribute response_message.



10
11
12
# File 'lib/railway_ipc/rpc/client/client.rb', line 10

def response_message
  @response_message
end

Class Method Details

.handle_response(response_type) ⇒ Object



20
21
22
# File 'lib/railway_ipc/rpc/client/client.rb', line 20

def self.handle_response(response_type)
  RPC::ClientResponseHandlers.instance.register(response_type)
end

.request(message) ⇒ Object



16
17
18
# File 'lib/railway_ipc/rpc/client/client.rb', line 16

def self.request(message)
  new(message).request
end

Instance Method Details

#await_response(timeout) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/railway_ipc/rpc/client/client.rb', line 68

def await_response(timeout)
  rabbit_connection.check_for_message(timeout: timeout) do |_, _, payload|
    self.response_message = process_payload(payload)
  end
rescue RailwayIpc::Rabbitmq::Adapter::TimeoutError
  # rubocop:disable Style/RedundantSelf
  error = self.class.rpc_error_adapter_class.error_message(TimeoutError.new, self.request_message)
  # rubocop:enable Style/RedundantSelf
  self.response_message = RailwayIpc::Response.new(error, success: false)
rescue StandardError
  self.response_message = RailwayIpc::Response.new(message, success: false)
ensure
  rabbit_connection.disconnect
end

#process_payload(response) ⇒ Object

rubocop:disable Metrics/AbcSize



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/railway_ipc/rpc/client/client.rb', line 42

def process_payload(response)
  decoded_payload = decode_payload(response)
  case decoded_payload.type
  when *registered_handlers
    @message = get_message_class(decoded_payload).decode(decoded_payload.message)
    RailwayIpc.logger.info(
      'Handling response',
      feature: 'railway_ipc_consumer',
      exchange: self.class.exchange_name,
      protobuf: { type: message.class, data: message }
    )
    RailwayIpc::Response.new(message, success: true)
  else
    @message = LearnIpc::ErrorMessage.decode(decoded_payload.message)
    raise RailwayIpc::UnhandledMessageError.new("#{self.class} does not know how to handle #{decoded_payload.type}")
  end
end

#registered_handlersObject



37
38
39
# File 'lib/railway_ipc/rpc/client/client.rb', line 37

def registered_handlers
  RailwayIpc::RPC::ClientResponseHandlers.instance.registered
end

#request(timeout = 10) ⇒ Object



29
30
31
32
33
34
35
# File 'lib/railway_ipc/rpc/client/client.rb', line 29

def request(timeout=10)
  setup_rabbit_connection
  attach_reply_queue_to_message
  publish_message
  await_response(timeout)
  response_message
end

#setup_rabbit_connectionObject

rubocop:enable Metrics/AbcSize



61
62
63
64
65
66
# File 'lib/railway_ipc/rpc/client/client.rb', line 61

def setup_rabbit_connection
  rabbit_connection
    .connect
    .create_exchange
    .create_queue(auto_delete: true, exclusive: true)
end