Class: RailwayIpc::Publisher
- Inherits:
-
Object
- Object
- RailwayIpc::Publisher
- Defined in:
- lib/railway_ipc/publisher.rb
Instance Attribute Summary collapse
-
#exchange_name ⇒ Object
readonly
Returns the value of attribute exchange_name.
-
#message_store ⇒ Object
readonly
Returns the value of attribute message_store.
Instance Method Summary collapse
-
#exchange ⇒ Object
rubocop:enable Metrics/AbcSize.
-
#initialize(opts = {}) ⇒ Publisher
constructor
A new instance of Publisher.
-
#publish(message) ⇒ Object
rubocop:disable Metrics/AbcSize.
Constructor Details
#initialize(opts = {}) ⇒ Publisher
Returns a new instance of Publisher.
57 58 59 60 |
# File 'lib/railway_ipc/publisher.rb', line 57 def initialize(opts={}) @exchange_name = opts.fetch(:exchange_name) @message_store = opts.fetch(:message_store, RailwayIpc::PublishedMessage) end |
Instance Attribute Details
#exchange_name ⇒ Object (readonly)
Returns the value of attribute exchange_name.
55 56 57 |
# File 'lib/railway_ipc/publisher.rb', line 55 def exchange_name @exchange_name end |
#message_store ⇒ Object (readonly)
Returns the value of attribute message_store.
55 56 57 |
# File 'lib/railway_ipc/publisher.rb', line 55 def @message_store end |
Instance Method Details
#exchange ⇒ Object
rubocop:enable Metrics/AbcSize
82 83 84 |
# File 'lib/railway_ipc/publisher.rb', line 82 def exchange @exchange ||= channel.exchange(exchange_name, type: :fanout, durable: true, auto_delete: false, arguments: {}) end |
#publish(message) ⇒ Object
rubocop:disable Metrics/AbcSize
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/railway_ipc/publisher.rb', line 63 def publish() .uuid = SecureRandom.uuid if .uuid.blank? .correlation_id = SecureRandom.uuid if .correlation_id.blank? RailwayIpc.logger.info('Publishing message', ()) = .(exchange_name, ) exchange.publish(RailwayIpc::Rabbitmq::Payload.encode()) rescue RailwayIpc::InvalidProtobuf => e RailwayIpc.logger.error('Invalid protobuf', ()) raise e rescue ActiveRecord::RecordInvalid => e RailwayIpc.logger.error('Failed to store outgoing message', ()) raise RailwayIpc::FailedToStoreOutgoingMessage.new(e) rescue StandardError => e &.destroy raise e end |