Class: ReliableMsg::MessageStore::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/reliable-msg/message-store.rb

Overview

Base class for message store.

Direct Known Subclasses

Disk

Constant Summary collapse

ERROR_INVALID_MESSAGE_STORE =

:nodoc:

"No message store '%s' available (note: case is not important)"
@@stores =

:nodoc:

{}

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger) ⇒ Base

Returns a new instance of Base.



31
32
33
# File 'lib/reliable-msg/message-store.rb', line 31

def initialize logger
    @logger = logger
end

Class Method Details

.configure(config, logger) ⇒ Object

Returns a message store from the specified configuration (previously created with configure).

:call-seq:

Base::configure(config, logger) -> store

Raises:

  • (RuntimeError)


145
146
147
148
149
150
# File 'lib/reliable-msg/message-store.rb', line 145

def self.configure config, logger
    type = config["type"].downcase
    cls = @@stores[type]
    raise RuntimeError, format(ERROR_INVALID_MESSAGE_STORE, type) unless cls
    cls.new config, logger
end

Instance Method Details

#activateObject

Activates the message store. Call this method before using the message store.

:call-seq:

store.activate


71
72
73
74
75
76
77
# File 'lib/reliable-msg/message-store.rb', line 71

def activate
    @mutex = Mutex.new
    @queues = {Queue::DLQ=>[]}
    @topics = {}
    @cache = {}
    # TODO: add recovery logic
end

#configurationObject

Returns the message store configuration as a hash.

:call-seq:

store.configuration -> hash

Raises:

  • (RuntimeException)


60
61
62
# File 'lib/reliable-msg/message-store.rb', line 60

def configuration
    raise RuntimeException, "Not implemented"
end

#deactivateObject

Deactivates the message store. Call this method when done using the message store.

:call-seq:

store.deactivate


86
87
88
# File 'lib/reliable-msg/message-store.rb', line 86

def deactivate
    @mutex = @queues = @topics = @cache = nil
end

#get_headers(queue) ⇒ Object



123
124
125
# File 'lib/reliable-msg/message-store.rb', line 123

def get_headers queue
    return @queues[queue] || []
end

#get_last(topic, seen, &block) ⇒ Object



128
129
130
131
132
133
134
135
136
# File 'lib/reliable-msg/message-store.rb', line 128

def get_last topic, seen, &block
    headers = @topics[topic]
    return nil if headers.nil? || headers[:id] == seen
    if block.call(headers)
        id = headers[:id]
        message = @cache[id] || load(id, :topic, topic)
        {:id=>id, :headers=>headers, :message=>message}
    end
end

#get_message(queue, &block) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/reliable-msg/message-store.rb', line 109

def get_message queue, &block
    messages = @queues[queue]
    return nil unless messages
    messages.each do |headers|
        if block.call(headers)
            id = headers[:id]
            message = @cache[id] || load(id, :queue, queue)
            return {:id=>id, :headers=>headers, :message=>message}
        end
    end
    return nil
end

#setupObject

Set up the message store. Create files, database tables, etc.

:call-seq:

store.setup


51
52
# File 'lib/reliable-msg/message-store.rb', line 51

def setup
end

#transaction(&block) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/reliable-msg/message-store.rb', line 91

def transaction &block
    result = block.call inserts = [], deletes = [], dlqs= []
    begin
        update inserts, deletes, dlqs unless inserts.empty? && deletes.empty? && dlqs.empty?
    rescue Exception=>error
        @logger.error error
        # If an error occurs, the queue may be out of synch with the store.
        # Empty the cache and reload the queue, before raising the error.
        @cache = {}
        @queues = {Queue::DLQ=>[]}
        @topics = {}
        load_index
        raise error
    end
    result
end

#typeObject

Returns the message store type name.

:call-seq:

store.type -> string

Raises:

  • (RuntimeException)


41
42
43
# File 'lib/reliable-msg/message-store.rb', line 41

def type
    raise RuntimeException, "Not implemented"
end