Class: Mimi::Messaging::Adapters::Base
- Inherits:
-
Object
- Object
- Mimi::Messaging::Adapters::Base
- Defined in:
- lib/mimi/messaging/adapters/base.rb
Overview
An abstract messaging adapter.
An adapter implementation must implement the following methods:
-
#start()
-
#stop()
-
#command(target, message, opts)
-
#query(target, message, opts)
-
#event(target, message, opts)
-
#start_request_processor(queue_name, processor, opts)
-
#start_event_processor(topic_name, processor, opts)
-
#start_event_processor_with_queue(topic_name, queue_name, processor, opts)
-
#stop_all_processors
An adapter implementation must register itself using ‘.register_adapter_name` method.
Instance Attribute Summary collapse
-
#serializer ⇒ Object
readonly
Returns the value of attribute serializer.
Class Method Summary collapse
-
.register_adapter_name(adapter_name) ⇒ Object
Registers adapter class with given adapter name.
Instance Method Summary collapse
-
#command(_target, _message, _opts = {}) ⇒ Object
Sends the command to the given target.
-
#event(_target, _message, _opts = {}) ⇒ Object
Broadcasts the event with the given target.
-
#initialize(params = {}) ⇒ Base
constructor
Creates an Adapter instance.
-
#query(_target, _message, _opts = {}) ⇒ Hash
Executes the query to the given target and returns response.
-
#register_message_serializer(serializer) ⇒ Object
Registers the message serializer.
-
#start ⇒ Object
Starts the adapter.
-
#start_event_processor(_topic_name, processor, _opts = {}) ⇒ Object
Starts an event processor without a queue.
-
#start_event_processor_with_queue(_topic_name, _queue_name, processor, _opts = {}) ⇒ Object
Starts an event processor with a queue.
-
#start_request_processor(_queue_name, processor, _opts = {}) ⇒ Object
Starts a request (command/query) processor.
-
#stop ⇒ Object
Stops all message processors and then stops the adapter.
-
#stop_all_processors ⇒ Object
Stops all message (command, query and event) processors.
Constructor Details
#initialize(params = {}) ⇒ Base
Creates an Adapter instance
42 43 |
# File 'lib/mimi/messaging/adapters/base.rb', line 42 def initialize(params = {}) end |
Instance Attribute Details
#serializer ⇒ Object (readonly)
Returns the value of attribute serializer.
23 24 25 |
# File 'lib/mimi/messaging/adapters/base.rb', line 23 def serializer @serializer end |
Class Method Details
.register_adapter_name(adapter_name) ⇒ Object
Registers adapter class with given adapter name
29 30 31 32 33 34 35 36 |
# File 'lib/mimi/messaging/adapters/base.rb', line 29 def self.register_adapter_name(adapter_name) adapter_name = adapter_name.to_s if Mimi::Messaging::Adapters.registered_adapters.key?(adapter_name) raise "Mimi::Messaging adapter '#{adapter_name}' is already registered" end Mimi::Messaging::Adapters.registered_adapters[adapter_name] = self end |
Instance Method Details
#command(_target, _message, _opts = {}) ⇒ Object
Sends the command to the given target
72 73 74 |
# File 'lib/mimi/messaging/adapters/base.rb', line 72 def command(_target, , _opts = {}) raise "Method #command(target, message, opts) is not implemented by #{self.class}" end |
#event(_target, _message, _opts = {}) ⇒ Object
Broadcasts the event with the given target
95 96 97 |
# File 'lib/mimi/messaging/adapters/base.rb', line 95 def event(_target, , _opts = {}) raise "Method #event(target, message, opts) is not implemented by #{self.class}" end |
#query(_target, _message, _opts = {}) ⇒ Hash
Executes the query to the given target and returns response
85 86 87 |
# File 'lib/mimi/messaging/adapters/base.rb', line 85 def query(_target, , _opts = {}) raise "Method #query(target, message, opts) is not implemented by #{self.class}" end |
#register_message_serializer(serializer) ⇒ Object
Registers the message serializer
Message serializer must implement methods #serialize(Hash) -> String and #deserialize(String) -> Hash
189 190 191 192 193 194 195 196 |
# File 'lib/mimi/messaging/adapters/base.rb', line 189 def (serializer) raise "Message serializer is already registered in #{self.class}" if @serializer if !serializer.respond_to?(:serialize) || !serializer.respond_to?(:deserialize) raise "Invalid message serializer passed to #{self.class}" end @serializer = serializer end |
#start ⇒ Object
Starts the adapter.
All the message processors must be started after the adapter is started. Before the adapter is started it MAY respond with an error to an attempt to start a message processor.
Serializer must be registered before any message is sent or received.
53 54 55 |
# File 'lib/mimi/messaging/adapters/base.rb', line 53 def start raise "Method #start() is not implemented by #{self.class}" end |
#start_event_processor(_topic_name, processor, _opts = {}) ⇒ Object
Starts an event processor without a queue
Processor must respond to #call_event() which accepts 3 arguments: (method, message, opts).
TBD: It must #ack! or #nack! the message.
If the processor raises an error, the message will be NACK-ed and accepted again at a later time.
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/mimi/messaging/adapters/base.rb', line 145 def start_event_processor(_topic_name, processor, _opts = {}) # validates processor return if processor.respond_to?(:call_event) && processor.method(:call_event).arity >= 3 raise( ArgumentError, "Invalid event processor passed to #{self.class}##{__method__}(), " \ "expected to respond to #call_event(event_type, message, opts)" ) end |
#start_event_processor_with_queue(_topic_name, _queue_name, processor, _opts = {}) ⇒ Object
Starts an event processor with a queue
Processor must respond to #call_event() which accepts 3 arguments: (method, message, opts).
TBD: It must #ack! or #nack! the message.
If the processor raises an error, the message will be NACK-ed and accepted again at a later time.
171 172 173 174 175 176 177 178 179 180 |
# File 'lib/mimi/messaging/adapters/base.rb', line 171 def start_event_processor_with_queue(_topic_name, _queue_name, processor, _opts = {}) # validates processor return if processor.respond_to?(:call_event) && processor.method(:call_event).arity >= 3 raise( ArgumentError, "Invalid event processor passed to #{self.class}##{__method__}(), " \ "expected to respond to #call_event(event_type, message, opts)" ) end |
#start_request_processor(_queue_name, processor, _opts = {}) ⇒ Object
Starts a request (command/query) processor.
Processor must respond to #call_command() AND #call_query() which accepts 3 arguments: (method, message, opts).
TBD: It must #ack! or #nack! the message.
If the processor raises an error, the message will be NACK-ed and accepted again at a later time.
NOTE: Method must be overloaded by a subclass.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/mimi/messaging/adapters/base.rb', line 115 def start_request_processor(_queue_name, processor, _opts = {}) # validates processor if ( processor.respond_to?(:call_command) && processor.method(:call_command).arity >= 3 && processor.respond_to?(:call_query) && processor.method(:call_query).arity >= 3 ) return end raise( ArgumentError, "Invalid request processor passed to #{self.class}##{__method__}(), " \ "expected to respond to #call_command(method_name, message, opts) AND #call_query(...)" ) end |
#stop ⇒ Object
Stops all message processors and then stops the adapter.
59 60 61 |
# File 'lib/mimi/messaging/adapters/base.rb', line 59 def stop raise "Method #stop() is not implemented by #{self.class}" end |
#stop_all_processors ⇒ Object
Stops all message (command, query and event) processors.
Stops currently registered processors and stops accepting new messages for processors.
203 204 205 |
# File 'lib/mimi/messaging/adapters/base.rb', line 203 def stop_all_processors raise "Method #stop_all_processors() is not implemented by #{self.class}" end |