Class: RailwayIpc::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/railway_ipc/publisher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Publisher

Returns a new instance of Publisher.



57
58
59
60
# File 'lib/railway_ipc/publisher.rb', line 57

def initialize(opts={})
  @exchange_name = opts.fetch(:exchange_name)
  @message_store = opts.fetch(:message_store, RailwayIpc::PublishedMessage)
end

Instance Attribute Details

#exchange_nameObject (readonly)

Returns the value of attribute exchange_name.



55
56
57
# File 'lib/railway_ipc/publisher.rb', line 55

def exchange_name
  @exchange_name
end

#message_storeObject (readonly)

Returns the value of attribute message_store.



55
56
57
# File 'lib/railway_ipc/publisher.rb', line 55

def message_store
  @message_store
end

Instance Method Details

#exchangeObject

rubocop:enable Metrics/AbcSize



82
83
84
# File 'lib/railway_ipc/publisher.rb', line 82

def exchange
  @exchange ||= channel.exchange(exchange_name, type: :fanout, durable: true, auto_delete: false, arguments: {})
end

#publish(message) ⇒ Object

rubocop:disable Metrics/AbcSize



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/railway_ipc/publisher.rb', line 63

def publish(message)
  message.uuid = SecureRandom.uuid if message.uuid.blank?
  message.correlation_id = SecureRandom.uuid if message.correlation_id.blank?
  RailwayIpc.logger.info('Publishing message', log_message_options(message))

  stored_message = message_store.store_message(exchange_name, message)
  exchange.publish(RailwayIpc::Rabbitmq::Payload.encode(message))
rescue RailwayIpc::InvalidProtobuf => e
  RailwayIpc.logger.error('Invalid protobuf', log_message_options(message))
  raise e
rescue ActiveRecord::RecordInvalid => e
  RailwayIpc.logger.error('Failed to store outgoing message', log_message_options(message))
  raise RailwayIpc::FailedToStoreOutgoingMessage.new(e)
rescue StandardError => e
  stored_message&.destroy
  raise e
end