Class: Mimi::Messaging::Adapters::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/mimi/messaging/adapters/base.rb

Overview

An abstract messaging adapter.

An adapter implementation must implement the following methods:

  • #start()

  • #stop()

  • #command(target, message, opts)

  • #query(target, message, opts)

  • #event(target, message, opts)

  • #start_request_processor(queue_name, processor, opts)

  • #start_event_processor(topic_name, processor, opts)

  • #start_event_processor_with_queue(topic_name, queue_name, processor, opts)

  • #stop_all_processors

An adapter implementation must register itself using ‘.register_adapter_name` method.

Direct Known Subclasses

Memory, Test

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ Base

Creates an Adapter instance

Parameters:

  • params (Hash) (defaults to: {})

    adapter-specific configuration parameters



42
43
# File 'lib/mimi/messaging/adapters/base.rb', line 42

def initialize(params = {})
end

Instance Attribute Details

#serializerObject (readonly)

Returns the value of attribute serializer.



23
24
25
# File 'lib/mimi/messaging/adapters/base.rb', line 23

def serializer
  @serializer
end

Class Method Details

.register_adapter_name(adapter_name) ⇒ Object

Registers adapter class with given adapter name

Parameters:

  • adapter_name (String, Symbol)


29
30
31
32
33
34
35
36
# File 'lib/mimi/messaging/adapters/base.rb', line 29

def self.register_adapter_name(adapter_name)
  adapter_name = adapter_name.to_s
  if Mimi::Messaging::Adapters.registered_adapters.key?(adapter_name)
    raise "Mimi::Messaging adapter '#{adapter_name}' is already registered"
  end

  Mimi::Messaging::Adapters.registered_adapters[adapter_name] = self
end

Instance Method Details

#command(_target, _message, _opts = {}) ⇒ Object

Sends the command to the given target

Parameters:

Returns:

  • nil

Raises:

  • (SomeError)


72
73
74
# File 'lib/mimi/messaging/adapters/base.rb', line 72

def command(_target, _message, _opts = {})
  raise "Method #command(target, message, opts) is not implemented by #{self.class}"
end

#event(_target, _message, _opts = {}) ⇒ Object

Broadcasts the event with the given target

Parameters:

  • target (String)

    “<topic>#<event_type>”, e.g. “customers#created”

  • message (Mimi::Messaging::Message)
  • opts (Hash)

    additional options



95
96
97
# File 'lib/mimi/messaging/adapters/base.rb', line 95

def event(_target, _message, _opts = {})
  raise "Method #event(target, message, opts) is not implemented by #{self.class}"
end

#query(_target, _message, _opts = {}) ⇒ Hash

Executes the query to the given target and returns response

Parameters:

  • target (String)

    “<queue>/<method>”

  • message (Mimi::Messaging::Message)
  • opts (Hash)

    additional options, e.g. :timeout

Returns:

  • (Hash)

Raises:

  • (SomeError, TimeoutError)


85
86
87
# File 'lib/mimi/messaging/adapters/base.rb', line 85

def query(_target, _message, _opts = {})
  raise "Method #query(target, message, opts) is not implemented by #{self.class}"
end

#register_message_serializer(serializer) ⇒ Object

Registers the message serializer

Message serializer must implement methods #serialize(Hash) -> String and #deserialize(String) -> Hash

Parameters:

  • serializer (#serialize(), #deserialize())


189
190
191
192
193
194
195
196
# File 'lib/mimi/messaging/adapters/base.rb', line 189

def register_message_serializer(serializer)
  raise "Message serializer is already registered in #{self.class}" if @serializer
  if !serializer.respond_to?(:serialize) || !serializer.respond_to?(:deserialize)
    raise "Invalid message serializer passed to #{self.class}"
  end

  @serializer = serializer
end

#startObject

Starts the adapter.

All the message processors must be started after the adapter is started. Before the adapter is started it MAY respond with an error to an attempt to start a message processor.

Serializer must be registered before any message is sent or received.



53
54
55
# File 'lib/mimi/messaging/adapters/base.rb', line 53

def start
  raise "Method #start() is not implemented by #{self.class}"
end

#start_event_processor(_topic_name, processor, _opts = {}) ⇒ Object

Starts an event processor without a queue

Processor must respond to #call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must #ack! or #nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

Parameters:

  • topic_name (String)

    “<topic>”

  • processor (#call_event())
  • opts (Hash)

    additional adapter-specific options

Raises:

  • (ArgumentError)


145
146
147
148
149
150
151
152
153
154
# File 'lib/mimi/messaging/adapters/base.rb', line 145

def start_event_processor(_topic_name, processor, _opts = {})
  # validates processor
  return if processor.respond_to?(:call_event) && processor.method(:call_event).arity >= 3

  raise(
    ArgumentError,
    "Invalid event processor passed to #{self.class}##{__method__}(), " \
    "expected to respond to #call_event(event_type, message, opts)"
  )
end

#start_event_processor_with_queue(_topic_name, _queue_name, processor, _opts = {}) ⇒ Object

Starts an event processor with a queue

Processor must respond to #call_event() which accepts 3 arguments: (method, message, opts).

TBD: It must #ack! or #nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

Parameters:

  • topic_name (String)

    “<topic>”

  • queue_name (String)

    “<queue>”

  • processor (#call_event())
  • opts (Hash)

    additional adapter-specific options

Raises:

  • (ArgumentError)


171
172
173
174
175
176
177
178
179
180
# File 'lib/mimi/messaging/adapters/base.rb', line 171

def start_event_processor_with_queue(_topic_name, _queue_name, processor, _opts = {})
  # validates processor
  return if processor.respond_to?(:call_event) && processor.method(:call_event).arity >= 3

  raise(
    ArgumentError,
    "Invalid event processor passed to #{self.class}##{__method__}(), " \
    "expected to respond to #call_event(event_type, message, opts)"
  )
end

#start_request_processor(_queue_name, processor, _opts = {}) ⇒ Object

Starts a request (command/query) processor.

Processor must respond to #call_command() AND #call_query() which accepts 3 arguments: (method, message, opts).

TBD: It must #ack! or #nack! the message.

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

NOTE: Method must be overloaded by a subclass.

Parameters:

  • queue_name (String)

    “<queue>”

  • processor (#call_command(), #call_query())
  • opts (Hash)

    additional adapter-specific options

Raises:

  • (ArgumentError)


115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/mimi/messaging/adapters/base.rb', line 115

def start_request_processor(_queue_name, processor, _opts = {})
  # validates processor
  if (
    processor.respond_to?(:call_command) && processor.method(:call_command).arity >= 3 &&
    processor.respond_to?(:call_query) && processor.method(:call_query).arity >= 3
  )
    return
  end

  raise(
    ArgumentError,
    "Invalid request processor passed to #{self.class}##{__method__}(), " \
    "expected to respond to #call_command(method_name, message, opts) AND #call_query(...)"
  )
end

#stopObject

Stops all message processors and then stops the adapter.



59
60
61
# File 'lib/mimi/messaging/adapters/base.rb', line 59

def stop
  raise "Method #stop() is not implemented by #{self.class}"
end

#stop_all_processorsObject

Stops all message (command, query and event) processors.

Stops currently registered processors and stops accepting new messages for processors.



203
204
205
# File 'lib/mimi/messaging/adapters/base.rb', line 203

def stop_all_processors
  raise "Method #stop_all_processors() is not implemented by #{self.class}"
end