Class: Blinkbox::CommonMessaging::Exchange

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/blinkbox/common_messaging/exchange.rb

Instance Method Summary collapse

Constructor Details

#initialize(exchange_name, facility: File.basename($0, '.rb'), facility_version: "0.0.0-unknown") ⇒ Exchange

A wrapped class for Bunny::Exchange. Wrapped so we can take care of message validation and header conventions in the blinkbox Books format.

Parameters:

  • exchange_name (String)

    The name of the Exchange to connect to.

  • facility (String) (defaults to: File.basename($0, '.rb'))

    The name of the app or service (we’ve adopted the GELF naming term across ruby)

  • facility_version (String) (defaults to: "0.0.0-unknown")

    The version of the app or service which sent the message.

Raises:

  • (Bunny::NotFound)

    If the exchange does not exist.



14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/blinkbox/common_messaging/exchange.rb', line 14

def initialize(exchange_name, facility: File.basename($0, '.rb'), facility_version: "0.0.0-unknown")
  @app_id = "#{facility}:v#{facility_version}"
  connection = CommonMessaging.connection
  channel = connection.create_channel
  channel.confirm_select
  @exchange = channel.headers(
    exchange_name,
    durable: true,
    auto_delete: false,
    passive: true
  )
end

Instance Method Details

#publish(data, headers: {}, message_id_chain: [], confirm: true, validate: true) ⇒ Object

Publishes a message to the exchange with blinkbox Books default message headers and properties.

Worth noting that because of a quirk of the RabbitMQ Headers Exchange you cannot route on properties so, in order to facilitate routing on content-type, that key is written to the headers by default as well as to the properties.

 @return [String] The correlation_id of the message which was delivered.

Parameters:

  • data (Blinkbox::CommonMessaging::JsonSchemaPowered, String)

    The information which will be sent as the payload of the message. An instance of any class generated by Blinkbox::CommonMessaging.init_from_schema_at while :validate is true, or a String if false.

  • headers (Hash) (defaults to: {})

    A hash of string keys and string values which will be sent as headers with the message. Used for matching.

  • message_id_chain (Array<String>) (defaults to: [])

    Optional. The message_id_chain of the message which was received in order to prompt this one.

  • confirm (Boolean) (defaults to: true)

    Will block this method until the MQ server has confirmed the message has been persisted and routed.

  • validate (Boolean) (defaults to: true)

    if false will relax the constraint that the inbound data must be a JsonSchemaPowered object.

Raises:

  • (ArgumentError)


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/blinkbox/common_messaging/exchange.rb', line 39

def publish(data, headers: {}, message_id_chain: [], confirm: true, validate: true)
  raise ArgumentError, "All published messages must be validated. Please see Blinkbox::CommonMessaging.init_from_schema_at for details." if validate && !data.class.included_modules.include?(JsonSchemaPowered)
  raise ArgumentError, "message_id_chain must be an array of strings" unless message_id_chain.is_a?(Array)

  message_id = generate_message_id
  new_message_id_chain = message_id_chain.dup << message_id
  correlation_id = new_message_id_chain.first

  headers = headers.merge!("message_id_chain" => new_message_id_chain)
  options = {}

  if data.respond_to?(:content_type)
    hd = Blinkbox::CommonMessaging::HeaderDetectors.new(data)
    headers = hd.modified_headers(headers)
    # We have to do both of these because of RabbitMQ's weird header exchange protocol
    headers["content-type"] = data.content_type 
    options[:content_type] = data.content_type
    data = data.to_json
  end

  options.merge!(
    persistent: true,
    correlation_id: correlation_id,
    message_id: message_id,
    app_id: @app_id,
    timestamp: Time.now.to_i,
    headers: headers
  )
  @exchange.publish(data, options)

  if confirm && !@exchange.channel.wait_for_confirms
    message_id = @exchange.channel.nacked_set.first
    raise UndeliverableMessageError, "Message #{message_id} was returned as undeliverable by RabbitMQ."
  end

  message_id
end