Class: RailwayIpc::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/railway_ipc/publisher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Publisher

Returns a new instance of Publisher.



7
8
9
10
# File 'lib/railway_ipc/publisher.rb', line 7

def initialize(opts={})
  @exchange_name = opts.fetch(:exchange_name)
  @message_store = opts.fetch(:message_store, RailwayIpc::PublishedMessage)
end

Instance Attribute Details

#exchange_nameObject (readonly)

Returns the value of attribute exchange_name.



5
6
7
# File 'lib/railway_ipc/publisher.rb', line 5

def exchange_name
  @exchange_name
end

#message_storeObject (readonly)

Returns the value of attribute message_store.



5
6
7
# File 'lib/railway_ipc/publisher.rb', line 5

def message_store
  @message_store
end

Instance Method Details

#exchangeObject

rubocop:enable Metrics/AbcSize



31
32
33
# File 'lib/railway_ipc/publisher.rb', line 31

def exchange
  @exchange ||= channel.exchange(exchange_name, type: :fanout, durable: true, auto_delete: false, arguments: {})
end

#publish(message, format = 'binary_protobuf') ⇒ Object

rubocop:disable Metrics/AbcSize



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/railway_ipc/publisher.rb', line 13

def publish(message, format='binary_protobuf')
  outgoing_message = OutgoingMessage.new(message, exchange_name, format)
  stored_message = message_store.store_message(outgoing_message)
  RailwayIpc.logger.info('Publishing message', log_message_options(message))
  exchange.publish(outgoing_message.encoded, headers: { message_format: format })
  outgoing_message
rescue RailwayIpc::InvalidProtobuf => e
  RailwayIpc.logger.error('Invalid protobuf', log_message_options(message))
  raise e
rescue ActiveRecord::RecordInvalid => e
  RailwayIpc.logger.error('Failed to store outgoing message', log_message_options(message))
  raise RailwayIpc::FailedToStoreOutgoingMessage.new(e)
rescue StandardError => e
  stored_message&.destroy
  raise e
end