Module: Blinkbox::CommonMessaging

Defined in:
lib/blinkbox/common_messaging.rb,
lib/blinkbox/common_messaging/queue.rb,
lib/blinkbox/common_messaging/version.rb,
lib/blinkbox/common_messaging/exchange.rb

Overview

A group of methods and classes which enable the delivery of messages through the blinkbox Books ecosystem via AMQP.

‘CommonMessaging.configure!` should be used to set up connection details first, then every subsequent call to `CommonMessaging::Queue.new` will create a `Bunny::Queue` object using the connection details that were present at the time.

Defined Under Namespace

Modules: JsonSchemaPowered Classes: Exchange, HeaderDetectors, Queue, UndeliverableMessageError

Constant Summary collapse

DEFAULT_CONFIG =

The default RabbitMQ connection details, in the format that Bunny needs them.

{
  bunny: {
    host: "localhost",
    port: 5672,
    user: "guest",
    pass: "guest",
    vhost: "/",
    log_level: Logger::WARN,
    automatically_recover: true,
    threaded: true,
    continuation_timeout: 4000
  },
  retry_interval: {
    initial: Unit("5 seconds"),
    max: Unit("5 seconds")
  },
  logger: Logger.new(nil)
}
VERSION =
begin
  File.read(File.join(File.dirname(__FILE__), "../../../VERSION"))
rescue Errno::ENOENT
  "0.0.0-unknown"
end

Class Method Summary collapse

Class Method Details

.class_from_content_type(content_type) ⇒ Object



225
226
227
228
229
230
231
232
233
# File 'lib/blinkbox/common_messaging.rb', line 225

def self.class_from_content_type(content_type)
  fail "No content type was given" if content_type.nil? || content_type.empty?
  begin
    schema_name = content_type.sub(%r{^application/vnd\.blinkbox\.books\.(.+)\+json$}, '\1')
    const_get(class_name_from_schema_name(schema_name))
  rescue
    raise "The schema for the #{content_type} content type has not been loaded"
  end
end

.class_name_from_schema_name(schema_name) ⇒ Object



235
236
237
# File 'lib/blinkbox/common_messaging.rb', line 235

def self.class_name_from_schema_name(schema_name)
  schema_name.tr("./", "_").camelcase
end

.close_connections(block_until_confirms: true) {|message_id| ... } ⇒ Object

Blocks until all the open connections have been closed, calling the block with any message_ids which haven’t been delivered

Parameters:

  • block_until_confirms (Boolean) (defaults to: true)

    Force the method to block until all messages have been acked or nacked.

Yields:

  • (message_id)

    Calls the given block for any message that was undeliverable (if block_until_confirms was ‘true`)

Yield Parameters:

  • message_id (String)

    The message_id of the message which could not be delivered



130
131
132
133
134
135
136
137
138
139
# File 'lib/blinkbox/common_messaging.rb', line 130

def self.close_connections(block_until_confirms: true)
  @@connections.each do |k, c|
    if block_until_confirms && !c.wait_for_confirms
      c.nacked_set.each do |message_id|
        yield message_id if block_given?
      end
    end
    c.close
  end
end

.configHash

Returns the current config being used (as used by Bunny)

Returns:

  • (Hash)


96
97
98
# File 'lib/blinkbox/common_messaging.rb', line 96

def self.config
  @@config rescue DEFAULT_CONFIG
end

.configure!(config, logger = nil) ⇒ Object

This method only stores connection details for calls to ‘CommonMessaging::Queue.new`. Any queues already created will not be affected by subsequent calls to this method.

This method converts the given options from the blinkbox Books common config format to the format required for Bunny so that calls like the following are possible:

Examples:

Using with CommonConfig

require "blinkbox/common_config"
require "blinkbox/common_messaging"

config = Blinkbox::CommonConfig.new
Blinkbox::CommonMessaging.configure!(config.tree(:rabbitmq))

Parameters:

  • config (Hash)

    The configuration options needed for an MQ connection.

  • logger (#debug, #info, #warn, #error, #fatal) (defaults to: nil)

    The logger instance which should be used by Bunny

Options Hash (config):

  • :url (String)

    The URL to the RabbitMQ server, eg. amqp://user:[email protected]:1234/virtual_host

  • :initialRetryInterval (Unit)

    The interval at which re-connection attempts should be made when a RabbitMQ failure first occurs.

  • :maxRetryInterval (Unit)

    The maximum interval at which RabbitMQ reconnection attempts should back off to.



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/blinkbox/common_messaging.rb', line 62

def self.configure!(config, logger = nil)
  @@config = DEFAULT_CONFIG

  unless config[:url].nil?
    uri = URI.parse(config[:url])
    @@config.deep_merge!(
      bunny: {
        host: uri.host,
        port: uri.port,
        user: uri.user,
        pass: uri.password,
        vhost: uri.path
      }
    )
  end

  %i{initialRetryInterval maxRetryInterval}.each do |unit_key|
    if config[unit_key]
      config[unit_key] = Unit(config[unit_key]) unless config[unit_key].is_a?(Unit)

      @@config.deep_merge!(
        retry_interval: {
          unit_key.to_s.sub('RetryInterval', '').to_sym => config[unit_key]
        }
      )
    end
  end

  self.logger = logger unless logger.nil?
end

.connectionBunny::Session

Returns (and starts if necessary) the connection to the RabbitMQ server as specified by the current config. Will keep only one connection per configuration at any time and will return or create a new connection as necessary. Channels are created with publisher confirmations.

Application code should not need to use this method.

Returns:

  • (Bunny::Session)


118
119
120
121
122
123
# File 'lib/blinkbox/common_messaging.rb', line 118

def self.connection
  @@connections ||= {}
  @@connections[config] ||= Bunny.new(config[:bunny])
  @@connections[config].start
  @@connections[config]
end

.init_from_schema_at(path, root = path) ⇒ Object

Generates ruby classes representing blinkbox Books messages from the schema files at the given path.

Examples:

Initialising CommonMessaging for sending

Blinkbox::CommonMessaging.init_from_schema_at("ingestion.book.metatdata.v2.schema.json")
msg = Blinkbox::CommonMessaging::IngestionBookMetadataV2.new(title: "A title")
exchange.publish(msg)

Using the root path

Blinkbox::CommonMessaging.init_from_schema_at("./schema/ingestion/book/metatdata/v2.schema.json")
# => [Blinkbox::CommonMessaging::SchemaIngestionBookMetadataV2]

Blinkbox::CommonMessaging.init_from_schema_at("./schema/ingestion/book/metatdata/v2.schema.json", "./schema")
# => [Blinkbox::CommonMessaging::IngestionBookMetadataV2]

Parameters:

  • path (String)

    The path to a (or a folder of) json-schema file(s) in the blinkbox Books format.

  • root (String) (defaults to: path)

    The root path from which namespaces will be calculated.

Returns:

  • Array of class names generated



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/blinkbox/common_messaging.rb', line 194

def self.init_from_schema_at(path, root = path)
  fail "The path #{path} does not exist" unless File.exist?(path)
  return Dir[File.join(path, "**/*.schema.json")].map { |file| init_from_schema_at(file, root) }.flatten if File.directory?(path)

  root = File.dirname(root) if root =~ /\.schema\.json$/
  schema_name = path.sub(%r{^(?:\./)?#{root}/?(.+)\.schema\.json$}, "\\1").tr("/",".")
  class_name = class_name_from_schema_name(schema_name)

  # We will re-declare these classes if required, rather than raise an error.
  remove_const(class_name) if constants.include?(class_name.to_sym)

  const_set(class_name, Class.new {
    include JsonSchemaPowered

    def initialize(data = {})
      @data = data
      @data = @data.stringify_keys if data.respond_to?(:stringify_keys)
      JSON::Validator.validate!(self.class.const_get("SCHEMA_FILE"), @data, insert_defaults: true)
    end

    def content_type
      self.class.const_get("CONTENT_TYPE")
    end
  })

  klass = const_get(class_name)
  klass.const_set('CONTENT_TYPE', "application/vnd.blinkbox.books.#{schema_name}+json")
  klass.const_set('SCHEMA_FILE', path)
  klass
end

.logger=(logger) ⇒ Object

Sets the logger delivered to Bunny when new connections are made

Parameters:

  • []

    logger The object to which log messages should be sent.



103
104
105
106
107
108
109
# File 'lib/blinkbox/common_messaging.rb', line 103

def self.logger=(logger)
  %i{debug info warn error fatal level= level}.each do |m|
    raise ArgumentError, "The logger did not respond to '#{m}'" unless logger.respond_to?(m)
  end
  @@config[:logger] = logger
  @@config[:bunny][:logger] = logger
end