Class: ReliableMsg::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/reliable-msg/queue-manager.rb

Constant Summary collapse

TX_TIMEOUT_CHECK_EVERY =
30
ERROR_SEND_MISSING_QUEUE =

:nodoc:

"You must specify a destination queue for the message"
ERROR_RECEIVE_MISSING_QUEUE =

:nodoc:

"You must specify a queue to retrieve the message from"
ERROR_INVALID_HEADER_NAME =

:nodoc:

"Invalid header '%s': expecting the name to be a symbol, found object of type %s"
ERROR_INVALID_HEADER_VALUE =

:nodoc:

"Invalid header '%s': expecting the value to be %s, found object of type %s"
ERROR_NO_TRANSACTION =

:nodoc:

"Transaction %s has completed, or was aborted"

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ QueueManager



134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/reliable-msg/queue-manager.rb', line 134

def initialize options = nil
    options ||= {}
    # Locks prevent two transactions from seeing the same message. We use a mutex

    # to ensure that each transaction can determine the state of a lock before

    # setting it.

    @mutex = Mutex.new
    @locks = {}
    # Transactions use this hash to hold all inserted messages (:inserts), deleted

    # messages (:deletes) and the transaction timeout (:timeout) until completion.

    @transactions = {}
    @logger = options[:logger] || Logger.new(STDOUT)
    @config = Config.new options[:config], @logger
    @config.load_or_create
end

Instance Method Details

#abort(tid) ⇒ Object

Raises:

  • (RuntimeError)


385
386
387
388
389
390
391
392
393
394
395
396
397
398
# File 'lib/reliable-msg/queue-manager.rb', line 385

def abort tid
    tx = @transactions[tid]
    raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
    # Release locks here because we are no longer in posession of any

    # retrieved messages.

    @mutex.synchronize do
        tx[:deletes].each do |delete|
            @locks.delete delete[:id]
            delete[:headers][:retry] += 1
        end
    end
    @transactions.delete tid
    @logger.warn "Transaction #{tid} aborted"
end

#alive?Boolean



210
211
212
# File 'lib/reliable-msg/queue-manager.rb', line 210

def alive?
    @drb_server && @drb_server.alive?
end

#begin(timeout) ⇒ Object



359
360
361
362
363
# File 'lib/reliable-msg/queue-manager.rb', line 359

def begin timeout
    tid = UUID.new
    @transactions[tid] = {:inserts=>[], :deletes=>[], :timeout=>Time.new.to_i + timeout}
    tid
end

#commit(tid) ⇒ Object

Raises:

  • (RuntimeError)


365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/reliable-msg/queue-manager.rb', line 365

def commit tid
    tx = @transactions[tid]
    raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
    begin
        @store.transaction do |inserts, deletes, dlqs|
            inserts.concat tx[:inserts]
            deletes.concat tx[:deletes]
        end
        # Release locks here, otherwise we expose messages before the

        # transaction gets the chance to delete them from the queue.

        @mutex.synchronize do
            tx[:deletes].each { |delete| @locks.delete delete[:id] }
        end
        @transactions.delete tid
    rescue Exception=>error
        abort tid
        raise error
    end
end

#enqueue(args) ⇒ Object

Raises:

  • (ArgumentError)


272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# File 'lib/reliable-msg/queue-manager.rb', line 272

def enqueue args
    # Get the arguments of this call.

    queue, selector, tid = args[:queue].downcase, args[:selector], args[:tid]
    id, headers = nil, nil
    raise ArgumentError, ERROR_RECEIVE_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty?

    # We need to lock the selected message, before deleting, otherwise,

    # we allow another transaction to see the message we're about to delete.

    # This is true whether we delete the message inside or outside a client

    # transaction. We can wrap everything with a mutex, but it's faster to

    # release the locks mutex as fast as possibe.

    message = @mutex.synchronize do
        message = @store.select queue do |headers|
            not @locks.has_key?(headers[:id]) and case selector
                when nil
                    true
                when String
                    headers[:id] == selector
                when Hash
                    selector.all? { |name, value| headers[name] == value }
                when Selector
                    selector.__evaluate__ headers
            end
        end
        if message
            @locks[message[:id]] = true
            message
        end
    end
    # Nothing to do if no message found.

    return unless message

    # If the message has expired, or maximum retry count elapsed, we either

    # discard the message, or send it to the DLQ. Since we're out of a message,

    # we call to get a new one. (This can be changed to repeat instead of recurse).

    headers = message[:headers]
    if queue != Queue::DLQ && ((headers[:expires_at] && headers[:expires_at] < Time.now.to_i) || (headers[:retry] > headers[:max_retries]))
        expired = {:id=>message[:id], :queue=>queue, :headers=>headers}
        if headers[:delivery] == :once || headers[:delivery] == :repeated
            @store.transaction { |inserts, deletes, dlqs| dlqs << expired }
        else # :best_effort

            @store.transaction { |inserts, deletes, dlqs| deletes << expired }
        end
        @mutex.synchronize { @locks.delete message[:id] }
        return enqueue(args)
    end

    delete = {:id=>message[:id], :queue=>queue, :headers=>headers}
    begin
        if tid
            tx = @transactions[tid]
            raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
            if queue != Queue::DLQ && headers[:delivery] == :once
                # Exactly once delivery: immediately move message to DLQ, so if

                # transaction aborts, message is not retrieved again. Do not

                # release lock here, to prevent message retrieved from DLQ.

                # Change delete record so message removed from DLQ on commit.

                @store.transaction do |inserts, deletes, dlqs|
                    dlqs << delete
                end
                delete[:queue] = Queue::DLQ
                tx[:deletes] << delete
            else
                # At most once delivery: delete message if transaction commits.

                # Best effort: we don't need to delete on commit, but it's more

                # efficient this way.

                # Exactly once: message never gets to expire in DLQ.

                tx[:deletes] << delete
            end
        else
            @store.transaction do |inserts, deletes, dlqs|
                deletes << delete
            end
            @mutex.synchronize { @locks.delete message[:id] }
        end
    rescue Exception=>error
        # Because errors do happen.

        @mutex.synchronize { @locks.delete message[:id] }
        raise error
    end

    # To prevent a transaction from modifying a message and then returning it to the

    # queue by aborting, we instead clone the message by de-serializing (this happens

    # in Queue, see there). The headers are also cloned (shallow, all values are frozen).

    return :id=>message[:id], :headers=>message[:headers].clone, :message=>message[:message]
end

#queue(args) ⇒ Object

Raises:

  • (ArgumentError)


214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/reliable-msg/queue-manager.rb', line 214

def queue args
    # Get the arguments of this call.

    message, headers, queue, tid = args[:message], args[:headers], args[:queue].downcase, args[:tid]
    raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty?
    time = Time.new.to_i
    # TODO: change this to support the RM delivery protocol.

    id = args[:id] || UUID.new
    created = args[:created] || time

    # Validate and freeze the headers. The cloning ensures that the headers we hold in memory

    # are not modified by the caller. The validation ensures that the headers we hold in memory

    # can be persisted safely. Basic types like string and integer are allowed, but application types

    # may prevent us from restoring the index. Strings are cloned since strings may be replaced.

    headers = if headers
        copy = {}
        headers.each_pair do |name, value|
            raise ArgumentError, format(ERROR_INVALID_HEADER_NAME, name, name.class) unless name.instance_of?(Symbol)
            case value
                when String, Numeric, Symbol, true, false, nil
                    copy[name] = value.freeze
                else
                    raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, name, "a string, numeric, symbol, true/false or nil", value.class)
            end
        end
        copy
    else
        {}
    end

    # Set the message headers controlled by the queue.

    headers[:id] = id
    headers[:received] = time
    headers[:delivery] ||= :best_effort
    headers[:retry] = 0
    headers[:max_retries] = integer headers[:max_retries], 0, Queue::DEFAULT_MAX_RETRIES
    headers[:priority] = integer headers[:priority], 0, 0
    if expires_at = headers[:expires_at]
        raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, :expires_at, "an integer", expires_at.class) unless expires_at.is_a?(Integer)
    elsif expires = headers[:expires]
        raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, :expires, "an integer", expires.class) unless expires.is_a?(Integer)
        headers[:expires_at] = Time.now.to_i + expires if expires > 0
    end
    # Create an insertion record for the new message.

    insert = {:id=>id, :queue=>queue, :headers=>headers, :message=>message}
    if tid
        tx = @transactions[tid]
        raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
        tx[:inserts] << insert
    else
        @store.transaction do |inserts, deletes, dlqs|
            inserts << insert
        end
    end
    # Return the message identifier.

    id
end

#startObject



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/reliable-msg/queue-manager.rb', line 149

def start
    @mutex.synchronize do
        return if @started

        # Get the message store based on the configuration, or default store.

        @store = MessageStore::Base.configure(@config.store || Config::DEFAULT_STORE, @logger)
        @logger.info "Using message store #{@store.type}"
        @store.activate

        # Get the DRb URI from the configuration, or use the default. Create a DRb server.

        drb = Config::DEFAULT_DRB
        drb.merge(@config.drb) if @config.drb
        drb_uri = "druby://localhost:#{drb['port']}"
        @drb_server = DRb::DRbServer.new drb_uri, self, :tcp_acl=>ACL.new(drb["acl"].split(" "), ACL::ALLOW_DENY), :verbose=>true
        @logger.info "Accepting requests at '#{drb_uri}'"

        # Create a background thread to stop timed-out transactions.

        @timeout_thread = Thread.new do
            begin
                while true
                    time = Time.new.to_i
                    @transactions.each_pair do |tid, tx|
                        if tx[:timeout] <= time
                            begin
                                @logger.warn "Timeout: aborting transaction #{tid}"
                                abort tid
                            rescue
                            end
                        end
                    end
                    sleep TX_TIMEOUT_CHECK_EVERY
                end
            rescue Exception=>error
                retry
            end
        end

        # Associate this queue manager with the local Queue class, instead of using DRb.

        Queue.send :qm=, self
        @started = true
    end
end

#stopObject



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/reliable-msg/queue-manager.rb', line 192

def stop
    @mutex.synchronize do
        return unless @started

        # Prevent transactions from timing out while we take down the server.

        @timeout_thread.terminate
        # Shutdown DRb server to prevent new requests from being processed.\

        Queue.send :qm=, nil
        drb_uri = @drb_server.uri
        @drb_server.stop_service
        # Deactivate the message store.

        @store.deactivate
        @store = nil
        @drb_server = @store = @timeout_thread = nil
        @logger.info "Stopped queue manager at '#{drb_uri}'"
    end
end