Class: RailwayIpc::Publisher
- Inherits:
-
Object
- Object
- RailwayIpc::Publisher
- Defined in:
- lib/railway_ipc/publisher.rb
Instance Attribute Summary collapse
-
#exchange_name ⇒ Object
readonly
Returns the value of attribute exchange_name.
-
#message_store ⇒ Object
readonly
Returns the value of attribute message_store.
Instance Method Summary collapse
-
#exchange ⇒ Object
rubocop:enable Metrics/AbcSize.
-
#initialize(opts = {}) ⇒ Publisher
constructor
A new instance of Publisher.
-
#publish(message, format = 'binary_protobuf') ⇒ Object
rubocop:disable Metrics/AbcSize.
Constructor Details
#initialize(opts = {}) ⇒ Publisher
Returns a new instance of Publisher.
7 8 9 10 |
# File 'lib/railway_ipc/publisher.rb', line 7 def initialize(opts={}) @exchange_name = opts.fetch(:exchange_name) = opts.fetch(:message_store, RailwayIpc::PublishedMessage) end |
Instance Attribute Details
#exchange_name ⇒ Object (readonly)
Returns the value of attribute exchange_name.
5 6 7 |
# File 'lib/railway_ipc/publisher.rb', line 5 def exchange_name @exchange_name end |
#message_store ⇒ Object (readonly)
Returns the value of attribute message_store.
5 6 7 |
# File 'lib/railway_ipc/publisher.rb', line 5 def end |
Instance Method Details
#exchange ⇒ Object
rubocop:enable Metrics/AbcSize
31 32 33 |
# File 'lib/railway_ipc/publisher.rb', line 31 def exchange @exchange ||= channel.exchange(exchange_name, type: :fanout, durable: true, auto_delete: false, arguments: {}) end |
#publish(message, format = 'binary_protobuf') ⇒ Object
rubocop:disable Metrics/AbcSize
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/railway_ipc/publisher.rb', line 13 def publish(, format='binary_protobuf') = OutgoingMessage.new(, exchange_name, format) = .() RailwayIpc.logger.info('Publishing message', ()) exchange.publish(.encoded, headers: { message_format: format }) rescue RailwayIpc::InvalidProtobuf => e RailwayIpc.logger.error('Invalid protobuf', ()) raise e rescue ActiveRecord::RecordInvalid => e RailwayIpc.logger.error('Failed to store outgoing message', ()) raise RailwayIpc::FailedToStoreOutgoingMessage.new(e) rescue StandardError => e &.destroy raise e end |