Class: Blinkbox::CommonMessaging::Exchange
- Inherits:
-
Object
- Object
- Blinkbox::CommonMessaging::Exchange
- Extended by:
- Forwardable
- Defined in:
- lib/blinkbox/common_messaging/exchange.rb
Instance Method Summary collapse
-
#initialize(exchange_name, facility: File.basename($0, '.rb'), facility_version: "0.0.0-unknown") ⇒ Exchange
constructor
A wrapped class for Bunny::Exchange.
-
#publish(data, headers: {}, message_id_chain: [], confirm: true, validate: true) ⇒ Object
Publishes a message to the exchange with blinkbox Books default message headers and properties.
Constructor Details
#initialize(exchange_name, facility: File.basename($0, '.rb'), facility_version: "0.0.0-unknown") ⇒ Exchange
A wrapped class for Bunny::Exchange. Wrapped so we can take care of message validation and header conventions in the blinkbox Books format.
14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/blinkbox/common_messaging/exchange.rb', line 14 def initialize(exchange_name, facility: File.basename($0, '.rb'), facility_version: "0.0.0-unknown") @app_id = "#{facility}:v#{facility_version}" connection = CommonMessaging.connection channel = connection.create_channel channel.confirm_select @exchange = channel.headers( exchange_name, durable: true, auto_delete: false, passive: true ) end |
Instance Method Details
#publish(data, headers: {}, message_id_chain: [], confirm: true, validate: true) ⇒ Object
Publishes a message to the exchange with blinkbox Books default message headers and properties.
Worth noting that because of a quirk of the RabbitMQ Headers Exchange you cannot route on properties so, in order to facilitate routing on content-type, that key is written to the headers by default as well as to the properties.
@return [String] The correlation_id of the message which was delivered.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/blinkbox/common_messaging/exchange.rb', line 39 def publish(data, headers: {}, message_id_chain: [], confirm: true, validate: true) raise ArgumentError, "All published messages must be validated. Please see Blinkbox::CommonMessaging.init_from_schema_at for details." if validate && !data.class.included_modules.include?(JsonSchemaPowered) raise ArgumentError, "message_id_chain must be an array of strings" unless .is_a?(Array) = = .dup << correlation_id = .first headers = headers.merge!("message_id_chain" => ) = {} if data.respond_to?(:content_type) hd = Blinkbox::CommonMessaging::HeaderDetectors.new(data) headers = hd.modified_headers(headers) # We have to do both of these because of RabbitMQ's weird header exchange protocol headers["content-type"] = data.content_type [:content_type] = data.content_type data = data.to_json end .merge!( persistent: true, correlation_id: correlation_id, message_id: , app_id: @app_id, timestamp: Time.now.to_i, headers: headers ) @exchange.publish(data, ) if confirm && !@exchange.channel.wait_for_confirms = @exchange.channel.nacked_set.first raise UndeliverableMessageError, "Message #{} was returned as undeliverable by RabbitMQ." end end |