Class: ReliableMsg::MessageStore::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/reliable-msg/message-store.rb

Overview

Base class for message store.

Direct Known Subclasses

Disk

Constant Summary collapse

ERROR_INVALID_MESSAGE_STORE =

:nodoc:

"No message store '%s' available (note: case is not important)"
@@stores =

:nodoc:

{}

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger) ⇒ Base

Returns a new instance of Base.



30
31
32
# File 'lib/reliable-msg/message-store.rb', line 30

def initialize logger
    @logger = logger
end

Class Method Details

.configure(config, logger) ⇒ Object

Returns a message store from the specified configuration (previously created with configure).

:call-seq:

Base::configure(config, logger) -> store

Raises:

  • (RuntimeError)


119
120
121
122
123
124
# File 'lib/reliable-msg/message-store.rb', line 119

def self.configure config, logger
    type = config["type"].downcase
    cls = @@stores[type]
    raise RuntimeError, format(ERROR_INVALID_MESSAGE_STORE, type) unless cls
    cls.new config, logger
end

Instance Method Details

#activateObject

Activates the message store. Call this method before using the message store.

:call-seq:

store.activate


66
67
68
69
70
71
# File 'lib/reliable-msg/message-store.rb', line 66

def activate
    @mutex = Mutex.new
    @queues = {Queue::DLQ=>[]}
    @cache = {}
    # TODO: add recovery logic

end

#configurationObject

Returns the message store configuration as a hash.

:call-seq:

store.configuration -> hash

Raises:

  • (RuntimeException)


56
57
58
# File 'lib/reliable-msg/message-store.rb', line 56

def configuration
    raise RuntimeException, "Not implemented"
end

#deactivateObject

Deactivates the message store. Call this method when done using the message store.

:call-seq:

store.deactivate


79
80
81
# File 'lib/reliable-msg/message-store.rb', line 79

def deactivate
    @mutex = @queues = @cache = nil
end

#select(queue, &block) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/reliable-msg/message-store.rb', line 99

def select queue, &block
    queue = @queues[queue]
    return nil unless queue
    queue.each do |headers|
        selected = block.call(headers)
        if selected
            id = headers[:id]
            message = @cache[id] || load(id, queue)
            return {:id=>id, :headers=>headers, :message=>message}
        end
    end
    return nil
end

#setupObject

Set up the message store. Create files, database tables, etc.

:call-seq:

store.setup


48
49
# File 'lib/reliable-msg/message-store.rb', line 48

def setup
end

#transaction(&block) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/reliable-msg/message-store.rb', line 83

def transaction &block
    result = block.call inserts = [], deletes = [], dlqs= []
    begin
        update inserts, deletes, dlqs unless inserts.empty? && deletes.empty? && dlqs.empty?
    rescue Exception=>error
        @logger.error error
        # If an error occurs, the queue may be out of synch with the store.

        # Empty the cache and reload the queue, before raising the error.

        @cache = {}
        @queues = {Queue::DLQ=>[]}
        load_index
        raise error
    end
    result
end

#typeObject

Returns the message store type name.

:call-seq:

store.type -> string

Raises:

  • (RuntimeException)


39
40
41
# File 'lib/reliable-msg/message-store.rb', line 39

def type
    raise RuntimeException, "Not implemented"
end