Module: Mimi::Messaging

Defined in:
lib/mimi/messaging.rb,
lib/mimi/messaging/errors.rb,
lib/mimi/messaging/message.rb,
lib/mimi/messaging/version.rb,
lib/mimi/messaging/adapters.rb,
lib/mimi/messaging/adapters/base.rb,
lib/mimi/messaging/adapters/test.rb,
lib/mimi/messaging/adapters/memory.rb,
lib/mimi/messaging/json_serializer.rb

Overview

Mimi::Messaging implements a messaging layer of a microservice application.

Usage: [TBD]

Defined Under Namespace

Modules: Adapters, JsonSerializer Classes: ConfigurationError, ConnectionError, Error, Message, NACK

Constant Summary collapse

REQUEST_TARGET_REGEX =

Request target validation pattern: “[<name>.]<name>/<identifier>” Where <name> consists of characters: A-Za-z0-9_- and <method_name> can be any of: A-Za-z0-9_

Example: “shop.orders/list”

%r{^([\w\-]+\.)*([\w\-]+)\/(\w+)$}.freeze
EVENT_TARGET_REGEX =

Event target validation pattern: “[<name>.]<name>#<identifier>” Where <name> consists of characters: A-Za-z0-9_- and <method_name> can be any of: A-Za-z0-9_

Example: “shop.orders#created”

%r{^([\w\-]+\.)*([\w\-]+)\#(\w+)$}.freeze
DEFAULT_LOG_AT_LEVEL =

By default Mimi::Messaging logs at given level

:info
VERSION =
"1.2.8"

Class Method Summary collapse

Class Method Details

.adapterMimi::Messaging::Adapter

Returns the configured adapter

Returns:

  • (Mimi::Messaging::Adapter)

Raises:



84
85
86
87
88
# File 'lib/mimi/messaging.rb', line 84

def self.adapter
  raise Error, "Mimi::Messaging adapter is not configured" unless @adapter

  @adapter
end

.command(target, message = {}, opts = {}) ⇒ Object

Sends the command to the given target

Example:

Mimi::Messaging.command("users/create", name: "John Smith")

Parameters:

  • target (String)

    “<queue>/<method>”

  • message (Hash, Mimi::Messaging::Message) (defaults to: {})
  • opts (Hash) (defaults to: {})

    additional adapter-specific options

Returns:

  • nil

Raises:

  • (ArgumentError)


198
199
200
201
202
203
204
# File 'lib/mimi/messaging.rb', line 198

def self.command(target, message = {}, opts = {})
  raise ArgumentError, "Invalid target argument" unless REQUEST_TARGET_REGEX.match(target)
  raise ArgumentError, "Invalid message, Hash or Message is expected" unless message.is_a?(Hash)
  raise Error, "Failed to send command, adapter is not started" unless started?(:adapter)

  adapter.command(target, Mimi::Messaging::Message.new(message), opts)
end

.configure(options) ⇒ Object

Configure the Messaging layer

Configures the adapter (type) and the adapter specific options.

Parameters:

  • options (Hash)

    options passed to the adapter

Options Hash (options):

  • :mq_adapter (String, Symbol)

    Adapter type, one of “memory”, “test” etc

Raises:

  • (ArgumentError)


58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/mimi/messaging.rb', line 58

def self.configure(options)
  raise ArgumentError, "Hash is expected as options" unless options.is_a?(Hash)
  raise ConfigurationError, ":mq_adapter is expected to be set" unless options.key?(:mq_adapter)

  @options = options.dup
  adapter_name = options[:mq_adapter].to_s
  adapter_class = Mimi::Messaging::Adapters.registered_adapters[adapter_name]
  unless adapter_class
    registered_adapter_names = Mimi::Messaging::Adapters.registered_adapters.keys
    raise(
      ConfigurationError,
      "Failed to find adapter with name '#{adapter_name}', " \
      " registered adapters are: #{registered_adapter_names.join(', ')}"
    )
  end

  @adapter = adapter_class.new(@options)
  raise ConfigurationError, "Message serializer is not registered" unless @serializer

  @adapter.register_message_serializer(@serializer)
end

.event(target, message = {}, opts = {}) ⇒ Object

Broadcasts the event with the given target

Parameters:

  • target (String)

    “<topic>#<event_type>”, e.g. “customers#created”

  • message (Hash, Mimi::Messaging::Message) (defaults to: {})
  • opts (Hash) (defaults to: {})

    additional options

Raises:

  • (ArgumentError)


233
234
235
236
237
238
239
# File 'lib/mimi/messaging.rb', line 233

def self.event(target, message = {}, opts = {})
  raise ArgumentError, "Invalid target argument" unless EVENT_TARGET_REGEX.match(target)
  raise ArgumentError, "Invalid message, Hash or Message is expected" unless message.is_a?(Hash)
  raise Error, "Failed to broadcast event, adapter is not started" unless started?(:adapter)

  adapter.event(target, Mimi::Messaging::Message.new(message), opts)
end

.log(message) ⇒ Object

Logs with configured logger at configured logging level

Parameters:

  • message (String)


381
382
383
384
385
386
387
388
389
# File 'lib/mimi/messaging.rb', line 381

def self.log(message)
  return unless logger

  log_at_level = options[:mq_log_at_level] || DEFAULT_LOG_AT_LEVEL
  log_at_level = log_at_level.to_sym
  return if log_at_level == :none

  logger.send(log_at_level, message)
end

.loggerLogger

Returns configured logger

Returns:

  • (Logger)

    or compatible



373
374
375
# File 'lib/mimi/messaging.rb', line 373

def self.logger
  @logger
end

.optionsHash

Returns the module configured options

Returns:

  • (Hash)


94
95
96
# File 'lib/mimi/messaging.rb', line 94

def self.options
  @options
end

.query(target, message = {}, opts = {}) ⇒ Hash

Executes the query to the given target and returns response

Raises Timeout::Error if the response from the target was not received in time.

Example:

result = Mimi::Messaging.query("users/find", id: 157)

Parameters:

  • target (String)

    “<queue>/<method>”

  • message (Hash, Mimi::Messaging::Message) (defaults to: {})
  • opts (Hash) (defaults to: {})

    additional options, e.g. :timeout

Returns:

  • (Hash)

Raises:

  • (ArgumentError)


219
220
221
222
223
224
225
# File 'lib/mimi/messaging.rb', line 219

def self.query(target, message = {}, opts = {})
  raise ArgumentError, "Invalid target argument" unless REQUEST_TARGET_REGEX.match(target)
  raise ArgumentError, "Invalid message, Hash or Message is expected" unless message.is_a?(Hash)
  raise Error, "Failed to send query, adapter is not started" unless started?(:adapter)

  adapter.query(target, Mimi::Messaging::Message.new(message), opts)
end

.register_event_processor(topic_name, processor, opts = {}) ⇒ Object

Registers an event processor without a queue

If the adapter and the processors are started, the processor will be automatically started (registered with the adapter).

Processor must respond to #call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must #ack! or #nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

Parameters:

  • topic_name (String)

    “<topic>”

  • processor (#call_event())
  • opts (Hash) (defaults to: {})

    additional adapter-specific options



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/mimi/messaging.rb', line 301

def self.register_event_processor(topic_name, processor, opts = {})
  # validates processor
  if !processor.respond_to?(:call_event) || processor.method(:call_event).arity < 3
    raise(
      ArgumentError,
      "Invalid event processor passed to .register_event_processor(), " \
      "expected to respond to #call_event(method_name, request, opts)"
    )
  end

  message_processor_params = {
    type: :event,
    topic_name: topic_name,
    processor: processor,
    opts: opts.dup,
    started: false
  }
  if started?(:adapter) && started?(:processors)
    start_message_processor(message_processor_params)
  end
  message_processors << message_processor_params
end

.register_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) ⇒ Object

Registers an event processor with a queue

If the adapter and the processors are started, the processor will be automatically started (registered with the adapter).

Processor must respond to #call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must #ack! or #nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

Parameters:

  • topic_name (String)

    “<topic>”

  • queue_name (String)

    “<queue>”

  • processor (#call_event())
  • opts (Hash) (defaults to: {})

    additional adapter-specific options



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/mimi/messaging.rb', line 342

def self.register_event_processor_with_queue(topic_name, queue_name, processor, opts = {})
  # validates processor
  if !processor.respond_to?(:call_event) || processor.method(:call_event).arity < 3
    raise(
      ArgumentError,
      "Invalid event processor passed to .register_event_processor_with_queue(), " \
      "expected to respond to #call_event(method_name, request, opts)"
    )
  end

  message_processor_params = {
    type: :event_with_queue,
    topic_name: topic_name,
    queue_name: queue_name,
    processor: processor,
    opts: opts.dup,
    started: false
  }
  if started?(:adapter) && started?(:processors)
    start_message_processor(message_processor_params)
  end
  message_processors << message_processor_params
end

.register_request_processor(queue_name, processor, opts = {}) ⇒ Object

Registers the request (command/query) processor.

If the adapter and the processors are started, the processor will be automatically started (registered with the adapter).

Processor must respond to #call_command() AND #call_query() which accepts 3 arguments: (method, message, opts).

TBD: It must #ack! or #nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

Parameters:

  • queue_name (String)

    “<queue>”

  • processor (#call_command())
  • opts (Hash) (defaults to: {})

    additional adapter-specific options



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/mimi/messaging.rb', line 258

def self.register_request_processor(queue_name, processor, opts = {})
  # validates processor
  unless (
    processor.respond_to?(:call_command) && processor.method(:call_command).arity >= 3 &&
    processor.respond_to?(:call_query) && processor.method(:call_query).arity >= 3
  )
    raise(
      ArgumentError,
      "Invalid request processor passed to .register_request_processor(), " \
      "expected to respond to #call_command(...) AND #call_query(method_name, request, opts)"
    )
  end

  message_processor_params = {
    type: :request,
    queue_name: queue_name,
    processor: processor,
    opts: opts.dup,
    started: false
  }
  if started?(:adapter) && started?(:processors)
    start_message_processor(message_processor_params)
  end
  message_processors << message_processor_params
end

.start(params = {}) ⇒ Object

Starts the Messaging module

Starts the adapter if it is not started yet, and registers the current message serializer with it. Starting the adapter opens connections with a message broker.

Automatically starts all currently registered message processors, unless the :processors option is false.

Example:

# to only start the adapter, so that we can send messages,
# but not process incoming messages:
Mimi::Messaging.start(processors: false)

# to start everything
Mimi::Messaging.start

Parameters:

  • params (Hash) (defaults to: {})

    additional parameters

Options Hash (params):

  • :adapter (true, false) — default: default: true

    start the adapter

  • :processors (true, false) — default: default: true

    automatically registers message processors



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/mimi/messaging.rb', line 121

def self.start(params = {})
  adapter # ensures that adapter is configured
  log("#{name} starting with adapter '#{options[:mq_adapter]}'")
  params = { # defaults
    adapter: true,
    processors: true
  }.merge(params)

  if !started?(:adapter) && params[:adapter]
    adapter.start
    started!(:adapter)
  end

  if !started?(:processors) && params[:processors]
    start_all_message_processors
    started!(:processors)
  end

  true
end

.stop(params = {}) ⇒ Object

Stops the Messaging module

Stops all currently registered message processors, unless :processors option is false.

Stops the adapter, unless :adapter option is false. Stopping the adapter closes connections with a message broker.

Example:

# to start everything
Mimi::Messaging.start

# to only stop the message processors, so that we can send messages
# but not process incoming messages:
Mimi::Messaging.stop(adapter: false, processors: true)

# to stop everything
Mimi::Messaging.stop

Parameters:

  • params (Hash) (defaults to: {})

    additional parameters

Options Hash (params):

  • :processors (true, false) — default: default: true

    deregister all message processors

  • :adapter (true, false) — default: default: true

    deregister all message processors



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/mimi/messaging.rb', line 167

def self.stop(params = {})
  params = { # defaults
    adapter: true,
    processors: true
  }.merge(params)

  if params[:processors]
    stop_all_processors
    started!(:processors, false)
  end

  if params[:adapter]
    adapter.stop # TODO: stopping adapter without stopping processors? TBD
    started!(:adapter, false)
  end

  log("#{name} stopped")
  true
end

.unregister_all_processorsObject

Deregisters all message processors



473
474
475
476
# File 'lib/mimi/messaging.rb', line 473

def self.unregister_all_processors
  stop_all_processors
  message_processors.replace([])
end

.use(options) ⇒ Object

Configure up the Messaging module

Sets up Messaging layer dependencies configuration, e.g. configures logger, message serializer etc.



46
47
48
49
# File 'lib/mimi/messaging.rb', line 46

def self.use(options)
  @serializer = options[:serializer] if options.key?(:serializer)
  @logger = options[:logger] if options.key?(:logger)
end