Module: Blinkbox::CommonMessaging
- Defined in:
- lib/blinkbox/common_messaging.rb,
lib/blinkbox/common_messaging/queue.rb,
lib/blinkbox/common_messaging/version.rb,
lib/blinkbox/common_messaging/exchange.rb
Overview
A group of methods and classes which enable the delivery of messages through the blinkbox Books ecosystem via AMQP.
‘CommonMessaging.configure!` should be used to set up connection details first, then every subsequent call to `CommonMessaging::Queue.new` will create a `Bunny::Queue` object using the connection details that were present at the time.
Defined Under Namespace
Modules: JsonSchemaPowered Classes: Exchange, HeaderDetectors, Queue, UndeliverableMessageError
Constant Summary collapse
- DEFAULT_CONFIG =
The default RabbitMQ connection details, in the format that Bunny needs them.
{ bunny: { host: "localhost", port: 5672, user: "guest", pass: "guest", vhost: "/", log_level: Logger::WARN, automatically_recover: true, threaded: true, continuation_timeout: 4000 }, retry_interval: { initial: Unit("5 seconds"), max: Unit("5 seconds") }, logger: Logger.new(nil) }
- VERSION =
begin File.read(File.join(File.dirname(__FILE__), "../../../VERSION")) rescue Errno::ENOENT "0.0.0-unknown" end
Class Method Summary collapse
- .class_from_content_type(content_type) ⇒ Object
- .class_name_from_schema_name(schema_name) ⇒ Object
-
.close_connections(block_until_confirms: true) {|message_id| ... } ⇒ Object
Blocks until all the open connections have been closed, calling the block with any message_ids which haven’t been delivered.
-
.config ⇒ Hash
Returns the current config being used (as used by Bunny).
-
.configure!(config, logger = nil) ⇒ Object
This method only stores connection details for calls to ‘CommonMessaging::Queue.new`.
-
.connection ⇒ Bunny::Session
Returns (and starts if necessary) the connection to the RabbitMQ server as specified by the current config.
-
.init_from_schema_at(path, root = path) ⇒ Object
Generates ruby classes representing blinkbox Books messages from the schema files at the given path.
-
.logger=(logger) ⇒ Object
Sets the logger delivered to Bunny when new connections are made.
Class Method Details
.class_from_content_type(content_type) ⇒ Object
225 226 227 228 229 230 231 232 233 |
# File 'lib/blinkbox/common_messaging.rb', line 225 def self.class_from_content_type(content_type) fail "No content type was given" if content_type.nil? || content_type.empty? begin schema_name = content_type.sub(%r{^application/vnd\.blinkbox\.books\.(.+)\+json$}, '\1') const_get(class_name_from_schema_name(schema_name)) rescue raise "The schema for the #{content_type} content type has not been loaded" end end |
.class_name_from_schema_name(schema_name) ⇒ Object
235 236 237 |
# File 'lib/blinkbox/common_messaging.rb', line 235 def self.class_name_from_schema_name(schema_name) schema_name.tr("./", "_").camelcase end |
.close_connections(block_until_confirms: true) {|message_id| ... } ⇒ Object
Blocks until all the open connections have been closed, calling the block with any message_ids which haven’t been delivered
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/blinkbox/common_messaging.rb', line 130 def self.close_connections(block_until_confirms: true) @@connections.each do |k, c| if block_until_confirms && !c.wait_for_confirms c.nacked_set.each do || yield if block_given? end end c.close end end |
.config ⇒ Hash
Returns the current config being used (as used by Bunny)
96 97 98 |
# File 'lib/blinkbox/common_messaging.rb', line 96 def self.config @@config rescue DEFAULT_CONFIG end |
.configure!(config, logger = nil) ⇒ Object
This method only stores connection details for calls to ‘CommonMessaging::Queue.new`. Any queues already created will not be affected by subsequent calls to this method.
This method converts the given options from the blinkbox Books common config format to the format required for Bunny so that calls like the following are possible:
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/blinkbox/common_messaging.rb', line 62 def self.configure!(config, logger = nil) @@config = DEFAULT_CONFIG unless config[:url].nil? uri = URI.parse(config[:url]) @@config.deep_merge!( bunny: { host: uri.host, port: uri.port, user: uri.user, pass: uri.password, vhost: uri.path } ) end %i{initialRetryInterval maxRetryInterval}.each do |unit_key| if config[unit_key] config[unit_key] = Unit(config[unit_key]) unless config[unit_key].is_a?(Unit) @@config.deep_merge!( retry_interval: { unit_key.to_s.sub('RetryInterval', '').to_sym => config[unit_key] } ) end end self.logger = logger unless logger.nil? end |
.connection ⇒ Bunny::Session
Returns (and starts if necessary) the connection to the RabbitMQ server as specified by the current config. Will keep only one connection per configuration at any time and will return or create a new connection as necessary. Channels are created with publisher confirmations.
Application code should not need to use this method.
118 119 120 121 122 123 |
# File 'lib/blinkbox/common_messaging.rb', line 118 def self.connection @@connections ||= {} @@connections[config] ||= Bunny.new(config[:bunny]) @@connections[config].start @@connections[config] end |
.init_from_schema_at(path, root = path) ⇒ Object
Generates ruby classes representing blinkbox Books messages from the schema files at the given path.
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 |
# File 'lib/blinkbox/common_messaging.rb', line 194 def self.init_from_schema_at(path, root = path) fail "The path #{path} does not exist" unless File.exist?(path) return Dir[File.join(path, "**/*.schema.json")].map { |file| init_from_schema_at(file, root) }.flatten if File.directory?(path) root = File.dirname(root) if root =~ /\.schema\.json$/ schema_name = path.sub(%r{^(?:\./)?#{root}/?(.+)\.schema\.json$}, "\\1").tr("/",".") class_name = class_name_from_schema_name(schema_name) # We will re-declare these classes if required, rather than raise an error. remove_const(class_name) if constants.include?(class_name.to_sym) const_set(class_name, Class.new { include JsonSchemaPowered def initialize(data = {}) @data = data @data = @data.stringify_keys if data.respond_to?(:stringify_keys) JSON::Validator.validate!(self.class.const_get("SCHEMA_FILE"), @data, insert_defaults: true) end def content_type self.class.const_get("CONTENT_TYPE") end }) klass = const_get(class_name) klass.const_set('CONTENT_TYPE', "application/vnd.blinkbox.books.#{schema_name}+json") klass.const_set('SCHEMA_FILE', path) klass end |
.logger=(logger) ⇒ Object
Sets the logger delivered to Bunny when new connections are made
103 104 105 106 107 108 109 |
# File 'lib/blinkbox/common_messaging.rb', line 103 def self.logger=(logger) %i{debug info warn error fatal level= level}.each do |m| raise ArgumentError, "The logger did not respond to '#{m}'" unless logger.respond_to?(m) end @@config[:logger] = logger @@config[:bunny][:logger] = logger end |