Class: RailwayIpc::Server

Inherits:
Object
  • Object
show all
Extended by:
RPC::ErrorAdapterConfigurable, RPC::MessageObservationConfigurable
Defined in:
lib/railway_ipc/rpc/server/server.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RPC::ErrorAdapterConfigurable

rpc_error_adapter, rpc_error_adapter_class

Methods included from RPC::MessageObservationConfigurable

exchange_name, listen_to, queue_name

Constructor Details

#initialize(_queue, _pool, opts = { automatic_recovery: true }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) ⇒ Server

Returns a new instance of Server.



17
18
19
20
21
22
23
# File 'lib/railway_ipc/rpc/server/server.rb', line 17

def initialize(_queue, _pool, opts={ automatic_recovery: true }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter)
  @rabbit_connection = rabbit_adapter.new(
    queue_name: self.class.queue_name,
    exchange_name: self.class.exchange_name,
    options: opts
  )
end

Instance Attribute Details

#messageObject (readonly)

Returns the value of attribute message.



11
12
13
# File 'lib/railway_ipc/rpc/server/server.rb', line 11

def message
  @message
end

#responderObject (readonly)

Returns the value of attribute responder.



11
12
13
# File 'lib/railway_ipc/rpc/server/server.rb', line 11

def responder
  @responder
end

Class Method Details

.respond_to(message_type, with:) ⇒ Object



13
14
15
# File 'lib/railway_ipc/rpc/server/server.rb', line 13

def self.respond_to(message_type, with:)
  RailwayIpc::RPC::ServerResponseHandlers.instance.register(handler: with, message: message_type)
end

Instance Method Details

#handle_request(payload) ⇒ Object

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/railway_ipc/rpc/server/server.rb', line 67

def handle_request(payload)
  response = work(payload)
rescue StandardError => e
  RailwayIpc.logger.error(
    'Error responding to message.',
    exception: e,
    feature: 'railway_ipc_consumer',
    exchange: self.class.exchange_name,
    queue: self.class.queue_name,
    protobuf: { type: message.class, data: message }
  )
  response = self.class.rpc_error_adapter_class.error_message(e, message)
ensure
  if response
    rabbit_connection.reply(
      RailwayIpc::Rabbitmq::Payload.encode(response), message.reply_to
    )
  end
end

#runObject



25
26
27
28
29
30
31
32
# File 'lib/railway_ipc/rpc/server/server.rb', line 25

def run
  rabbit_connection
    .connect
    .create_exchange
    .create_queue(durable: true)
    .bind_queue_to_exchange
  subscribe_to_queue
end

#stopObject



34
35
36
# File 'lib/railway_ipc/rpc/server/server.rb', line 34

def stop
  rabbit_connection.disconnect
end

#work(payload) ⇒ Object

rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/railway_ipc/rpc/server/server.rb', line 40

def work(payload)
  decoded_payload = RailwayIpc::Rabbitmq::Payload.decode(payload)
  case decoded_payload.type
  when *registered_handlers
    responder = get_responder(decoded_payload)
    @message = get_message_class(decoded_payload).decode(decoded_payload.message)
    responder.respond(message)
  else
    @message = LearnIpc::ErrorMessage.decode(decoded_payload.message)
    raise RailwayIpc::UnhandledMessageError.new("#{self.class} does not know how to handle #{decoded_payload.type}")
  end
rescue StandardError => e
  RailwayIpc.logger.error(
    e.message,
    feature: 'railway_ipc_consumer',
    exchange: self.class.exchange_name,
    queue: self.class.queue_name,
    error: e.class,
    payload: payload
  )
  raise e
end