Class: ReliableMsg::MessageStore::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/reliable-msg/message-store.rb

Overview

Base class for message store.

Direct Known Subclasses

Disk

Constant Summary collapse

ERROR_INVALID_MESSAGE_STORE =

:nodoc:

"No message store '%s' available (note: case is not important)"
@@stores =

:nodoc:

{}

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger) ⇒ Base

Returns a new instance of Base.



32
33
34
# File 'lib/reliable-msg/message-store.rb', line 32

def initialize logger
    @logger = logger
end

Class Method Details

.configure(config, logger) ⇒ Object

Returns a message store from the specified configuration (previously created with configure).

:call-seq:

Base::configure(config, logger) -> store

Raises:

  • (RuntimeError)


121
122
123
124
125
126
# File 'lib/reliable-msg/message-store.rb', line 121

def self.configure config, logger
    type = config["type"].downcase
    cls = @@stores[type]
    raise RuntimeError, format(ERROR_INVALID_MESSAGE_STORE, type) unless cls
    cls.new config, logger
end

Instance Method Details

#activateObject

Activates the message store. Call this method before using the message store.

:call-seq:

store.activate


68
69
70
71
72
73
# File 'lib/reliable-msg/message-store.rb', line 68

def activate
    @mutex = Mutex.new
    @queues = {Queue::DLQ=>[]}
    @cache = {}
    # TODO: add recovery logic
end

#configurationObject

Returns the message store configuration as a hash.

:call-seq:

store.configuration -> hash

Raises:

  • (RuntimeException)


58
59
60
# File 'lib/reliable-msg/message-store.rb', line 58

def configuration
    raise RuntimeException, "Not implemented"
end

#deactivateObject

Deactivates the message store. Call this method when done using the message store.

:call-seq:

store.deactivate


81
82
83
# File 'lib/reliable-msg/message-store.rb', line 81

def deactivate
    @mutex = @queues = @cache = nil
end

#select(queue, &block) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/reliable-msg/message-store.rb', line 101

def select queue, &block
    queue = @queues[queue]
    return nil unless queue
    queue.each do |headers|
        selected = block.call(headers)
        if selected
            id = headers[:id]
            message = @cache[id] || load(id, queue)
            return {:id=>id, :headers=>headers, :message=>message}
        end
    end
    return nil
end

#setupObject

Set up the message store. Create files, database tables, etc.

:call-seq:

store.setup


50
51
# File 'lib/reliable-msg/message-store.rb', line 50

def setup
end

#transaction(&block) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/reliable-msg/message-store.rb', line 85

def transaction &block
    result = block.call inserts = [], deletes = [], dlqs= []
    begin
        update inserts, deletes, dlqs unless inserts.empty? && deletes.empty? && dlqs.empty?
    rescue Exception=>error
        @logger.error error
        # If an error occurs, the queue may be out of synch with the store.
        # Empty the cache and reload the queue, before raising the error.
        @cache = {}
        @queues = {Queue::DLQ=>[]}
        load_index
        raise error
    end
    result
end

#typeObject

Returns the message store type name.

:call-seq:

store.type -> string

Raises:

  • (RuntimeException)


41
42
43
# File 'lib/reliable-msg/message-store.rb', line 41

def type
    raise RuntimeException, "Not implemented"
end