Class: ReliableMsg::MessageStore::Base
- Inherits:
-
Object
- Object
- ReliableMsg::MessageStore::Base
- Defined in:
- lib/reliable-msg/message-store.rb
Overview
Base class for message store.
Direct Known Subclasses
Constant Summary collapse
- ERROR_INVALID_MESSAGE_STORE =
:nodoc:
"No message store '%s' available (note: case is not important)"- @@stores =
:nodoc:
{}
Class Method Summary collapse
-
.configure(config, logger) ⇒ Object
Returns a message store from the specified configuration (previously created with configure).
Instance Method Summary collapse
-
#activate ⇒ Object
Activates the message store.
-
#configuration ⇒ Object
Returns the message store configuration as a hash.
-
#deactivate ⇒ Object
Deactivates the message store.
-
#initialize(logger) ⇒ Base
constructor
A new instance of Base.
- #select(queue, &block) ⇒ Object
-
#setup ⇒ Object
Set up the message store.
- #transaction(&block) ⇒ Object
-
#type ⇒ Object
Returns the message store type name.
Constructor Details
#initialize(logger) ⇒ Base
Returns a new instance of Base.
30 31 32 |
# File 'lib/reliable-msg/message-store.rb', line 30 def initialize logger @logger = logger end |
Class Method Details
.configure(config, logger) ⇒ Object
Returns a message store from the specified configuration (previously created with configure).
:call-seq:
Base::configure(config, logger) -> store
119 120 121 122 123 124 |
# File 'lib/reliable-msg/message-store.rb', line 119 def self.configure config, logger type = config["type"].downcase cls = @@stores[type] raise RuntimeError, format(ERROR_INVALID_MESSAGE_STORE, type) unless cls cls.new config, logger end |
Instance Method Details
#activate ⇒ Object
Activates the message store. Call this method before using the message store.
:call-seq:
store.activate
66 67 68 69 70 71 |
# File 'lib/reliable-msg/message-store.rb', line 66 def activate @mutex = Mutex.new @queues = {Queue::DLQ=>[]} @cache = {} # TODO: add recovery logic end |
#configuration ⇒ Object
Returns the message store configuration as a hash.
:call-seq:
store.configuration -> hash
56 57 58 |
# File 'lib/reliable-msg/message-store.rb', line 56 def configuration raise RuntimeException, "Not implemented" end |
#deactivate ⇒ Object
Deactivates the message store. Call this method when done using the message store.
:call-seq:
store.deactivate
79 80 81 |
# File 'lib/reliable-msg/message-store.rb', line 79 def deactivate @mutex = @queues = @cache = nil end |
#select(queue, &block) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/reliable-msg/message-store.rb', line 99 def select queue, &block queue = @queues[queue] return nil unless queue queue.each do |headers| selected = block.call(headers) if selected id = headers[:id] = @cache[id] || load(id, queue) return {:id=>id, :headers=>headers, :message=>} end end return nil end |
#setup ⇒ Object
Set up the message store. Create files, database tables, etc.
:call-seq:
store.setup
48 49 |
# File 'lib/reliable-msg/message-store.rb', line 48 def setup end |
#transaction(&block) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/reliable-msg/message-store.rb', line 83 def transaction &block result = block.call inserts = [], deletes = [], dlqs= [] begin update inserts, deletes, dlqs unless inserts.empty? && deletes.empty? && dlqs.empty? rescue Exception=>error @logger.error error # If an error occurs, the queue may be out of synch with the store. # Empty the cache and reload the queue, before raising the error. @cache = {} @queues = {Queue::DLQ=>[]} load_index raise error end result end |
#type ⇒ Object
Returns the message store type name.
:call-seq:
store.type -> string
39 40 41 |
# File 'lib/reliable-msg/message-store.rb', line 39 def type raise RuntimeException, "Not implemented" end |