Class: ReliableMsg::MessageStore::Base
- Inherits:
-
Object
- Object
- ReliableMsg::MessageStore::Base
- Defined in:
- lib/reliable-msg/message-store.rb
Overview
Base class for message store.
Direct Known Subclasses
Constant Summary collapse
- ERROR_INVALID_MESSAGE_STORE =
:nodoc:
"No message store '%s' available (note: case is not important)"- @@stores =
:nodoc:
{}
Class Method Summary collapse
-
.configure(config, logger) ⇒ Object
Returns a message store from the specified configuration (previously created with configure).
Instance Method Summary collapse
-
#activate ⇒ Object
Activates the message store.
-
#configuration ⇒ Object
Returns the message store configuration as a hash.
-
#deactivate ⇒ Object
Deactivates the message store.
-
#initialize(logger) ⇒ Base
constructor
A new instance of Base.
- #select(queue, &block) ⇒ Object
-
#setup ⇒ Object
Set up the message store.
- #transaction(&block) ⇒ Object
-
#type ⇒ Object
Returns the message store type name.
Constructor Details
#initialize(logger) ⇒ Base
Returns a new instance of Base.
32 33 34 |
# File 'lib/reliable-msg/message-store.rb', line 32 def initialize logger @logger = logger end |
Class Method Details
.configure(config, logger) ⇒ Object
Returns a message store from the specified configuration (previously created with configure).
:call-seq:
Base::configure(config, logger) -> store
121 122 123 124 125 126 |
# File 'lib/reliable-msg/message-store.rb', line 121 def self.configure config, logger type = config["type"].downcase cls = @@stores[type] raise RuntimeError, format(ERROR_INVALID_MESSAGE_STORE, type) unless cls cls.new config, logger end |
Instance Method Details
#activate ⇒ Object
Activates the message store. Call this method before using the message store.
:call-seq:
store.activate
68 69 70 71 72 73 |
# File 'lib/reliable-msg/message-store.rb', line 68 def activate @mutex = Mutex.new @queues = {Queue::DLQ=>[]} @cache = {} # TODO: add recovery logic end |
#configuration ⇒ Object
Returns the message store configuration as a hash.
:call-seq:
store.configuration -> hash
58 59 60 |
# File 'lib/reliable-msg/message-store.rb', line 58 def configuration raise RuntimeException, "Not implemented" end |
#deactivate ⇒ Object
Deactivates the message store. Call this method when done using the message store.
:call-seq:
store.deactivate
81 82 83 |
# File 'lib/reliable-msg/message-store.rb', line 81 def deactivate @mutex = @queues = @cache = nil end |
#select(queue, &block) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/reliable-msg/message-store.rb', line 101 def select queue, &block queue = @queues[queue] return nil unless queue queue.each do |headers| selected = block.call(headers) if selected id = headers[:id] = @cache[id] || load(id, queue) return {:id=>id, :headers=>headers, :message=>} end end return nil end |
#setup ⇒ Object
Set up the message store. Create files, database tables, etc.
:call-seq:
store.setup
50 51 |
# File 'lib/reliable-msg/message-store.rb', line 50 def setup end |
#transaction(&block) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/reliable-msg/message-store.rb', line 85 def transaction &block result = block.call inserts = [], deletes = [], dlqs= [] begin update inserts, deletes, dlqs unless inserts.empty? && deletes.empty? && dlqs.empty? rescue Exception=>error @logger.error error # If an error occurs, the queue may be out of synch with the store. # Empty the cache and reload the queue, before raising the error. @cache = {} @queues = {Queue::DLQ=>[]} load_index raise error end result end |
#type ⇒ Object
Returns the message store type name.
:call-seq:
store.type -> string
41 42 43 |
# File 'lib/reliable-msg/message-store.rb', line 41 def type raise RuntimeException, "Not implemented" end |