Class: ReliableMsg::QueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/reliable-msg/queue-manager.rb

Overview

The QueueManager handles message storage and delivery. Applications connect to the QueueManager either locally or remotely using the client API objects Queue and Topic.

You can start a QueueManager from the command line using

queues manager start

Or from code using

qm = QueueManager.new
qm.start

A Ruby process can only allow one active QueueManager at any given time. Do not run more than one QueueManager connected to the same database or file system storage, as this will cause the queue managers to operate on different messages and queues. Instead, use a single QueueManager and connect to it remotely using DRb.

The client API (Queue and Topic) will automatically connect to any QueueManager running in the same Ruby process, or if not found, to a QueueManager running in a different process using DRb.

Constant Summary collapse

TX_TIMEOUT_CHECK_EVERY =
30
ERROR_SEND_MISSING_QUEUE =

:nodoc:

"You must specify a destination queue for the message"
ERROR_RECEIVE_MISSING_QUEUE =

:nodoc:

"You must specify a queue to retrieve the message from"
ERROR_PUBLISH_MISSING_TOPIC =

:nodoc:

"You must specify a destination topic for the message"
ERROR_RETRIEVE_MISSING_TOPIC =

:nodoc:

"You must specify a topic to retrieve the message from"
ERROR_INVALID_HEADER_NAME =

:nodoc:

"Invalid header '%s': expecting the name to be a symbol, found object of type %s"
ERROR_INVALID_HEADER_VALUE =

:nodoc:

"Invalid header '%s': expecting the value to be %s, found object of type %s"
ERROR_NO_TRANSACTION =

:nodoc:

"Transaction %s has completed, or was aborted"
ERROR_QM_STARTED =

:nodoc:

"Queue manager already started for this process: stop the other queue manager before starting a new one"
ERROR_QM_NOT_STARTED =

:nodoc:

"Queue manager not active"
INFO_MESSAGE_STORE =

:nodoc:

"Using message store: %s"
INFO_ACCEPTING_DRB =

:nodoc:

"Accepting requests at: %s"
INFO_QM_STOPPED =

:nodoc:

"Stopped queue manager at: %s"
WARN_TRANSACTION_TIMEOUT =

:nodoc:

"Timeout: aborting transaction %s"
WARN_TRANSACTION_ABORTED =

:nodoc:

"Transaction %s aborted by client"
@@active =

:nodoc:

nil

Instance Method Summary collapse

Constructor Details

#initialize(options = nil) ⇒ QueueManager

Create a new QueueManager with the specified options. Once created, you can start the QueueManager with QueueManager.start.

Accepted options are:

  • :logger – The logger to use. If not specified, will log messages to STDOUT.

  • :config – The configuration file to use. If not specified, will use queues.cfg.



197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/reliable-msg/queue-manager.rb', line 197

def initialize options = nil
    options ||= {}
    # Locks prevent two transactions from seeing the same message. We use a mutex
    # to ensure that each transaction can determine the state of a lock before
    # setting it.
    @mutex = Mutex.new
    @locks = {}
    # Transactions use this hash to hold all inserted messages (:inserts), deleted
    # messages (:deletes) and the transaction timeout (:timeout) until completion.
    @transactions = {}
    @logger = options[:logger] || Logger.new(STDOUT)
    @config = Config.new options[:config], @logger
    @config.load_or_create
end

Instance Method Details

#abort(tid) ⇒ Object

Called by client to abort a transaction.

Raises:

  • (RuntimeError)


591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# File 'lib/reliable-msg/queue-manager.rb', line 591

def abort tid
    tx = @transactions[tid]
    raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
    # Release locks here because we are no longer in posession of any
    # retrieved messages.
    @mutex.synchronize do
        tx[:deletes].each do |delete|
            @locks.delete delete[:id]
            delete[:headers][:redelivery] = (delete[:headers][:redelivery] || 0) + 1
            # TODO: move to DLQ if delivery count or expires
        end
    end
    @transactions.delete tid
    @logger.warn format(WARN_TRANSACTION_ABORTED, tid)
end

#alive?Boolean

Returns true if the QueueManager is receiving remote requests.

Returns:

  • (Boolean)


296
297
298
# File 'lib/reliable-msg/queue-manager.rb', line 296

def alive?
    @drb_server && @drb_server.alive?
end

#begin(timeout) ⇒ Object

Called by client to begin a transaction.



561
562
563
564
565
# File 'lib/reliable-msg/queue-manager.rb', line 561

def begin timeout
    tid = UUID.new
    @transactions[tid] = {:inserts=>[], :deletes=>[], :timeout=>Time.new.to_i + timeout}
    tid
end

#commit(tid) ⇒ Object

Called by client to commit a transaction.

Raises:

  • (RuntimeError)


569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
# File 'lib/reliable-msg/queue-manager.rb', line 569

def commit tid
    tx = @transactions[tid]
    raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
    begin
        @store.transaction do |inserts, deletes, dlqs|
            inserts.concat tx[:inserts]
            deletes.concat tx[:deletes]
        end
        # Release locks here, otherwise we expose messages before the
        # transaction gets the chance to delete them from the queue.
        @mutex.synchronize do
            tx[:deletes].each { |delete| @locks.delete delete[:id] }
        end
        @transactions.delete tid
    rescue Exception=>error
        abort tid
        raise error
    end
end

#enqueue(args) ⇒ Object

Called by client to enqueue message.

Raises:

  • (ArgumentError)


387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/reliable-msg/queue-manager.rb', line 387

def enqueue args
    # Get the arguments of this call.
    queue, selector, tid = args[:queue].downcase, args[:selector], args[:tid]
    id, headers = nil, nil
    raise ArgumentError, ERROR_RECEIVE_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty?

    # We need to lock the selected message, before deleting, otherwise,
    # we allow another transaction to see the message we're about to delete.
    # This is true whether we delete the message inside or outside a client
    # transaction. We can wrap everything with a mutex, but it's faster to
    # release the locks mutex as fast as possibe.
    message = @mutex.synchronize do
        message = @store.get_message queue do |headers|
            not @locks.has_key?(headers[:id]) and case selector
                when nil
                    true
                when String
                    headers[:id] == selector
                when Hash
                    selector.all? { |name, value| headers[name] == value }
                else
                    raise RuntimeError, "Internal error"
            end
        end
        if message
            @locks[message[:id]] = true
            message
        end
    end
    # Nothing to do if no message found.
    return unless message

    # If the message has expired, or maximum delivery count elapsed, we either
    # discard the message, or send it to the DLQ. Since we're out of a message,
    # we call to get a new one. (This can be changed to repeat instead of recurse).
    headers = message[:headers]
    if queue != Client::DLQ && ((headers[:expires_at] && headers[:expires_at] < Time.now.to_i) || (headers[:redelivery] && headers[:redelivery] >= headers[:max_deliveries]))
        expired = {:id=>message[:id], :queue=>queue, :headers=>headers}
        if headers[:delivery] == :once || headers[:delivery] == :repeated
            @store.transaction { |inserts, deletes, dlqs| dlqs << expired }
        else # :best_effort
            @store.transaction { |inserts, deletes, dlqs| deletes << expired }
        end
        @mutex.synchronize { @locks.delete message[:id] }
        return enqueue(args)
    end

    delete = {:id=>message[:id], :queue=>queue, :headers=>headers}
    begin
        if tid
            tx = @transactions[tid]
            raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
            if queue != Client::DLQ && headers[:delivery] == :once
                # Exactly once delivery: immediately move message to DLQ, so if
                # transaction aborts, message is not retrieved again. Do not
                # release lock here, to prevent message retrieved from DLQ.
                # Change delete record so message removed from DLQ on commit.
                @store.transaction do |inserts, deletes, dlqs|
                    dlqs << delete
                end
                delete[:queue] = Client::DLQ
                tx[:deletes] << delete
            else
                # At most once delivery: delete message if transaction commits.
                # Best effort: we don't need to delete on commit, but it's more
                # efficient this way.
                # Exactly once: message never gets to expire in DLQ.
                tx[:deletes] << delete
            end
        else
            @store.transaction do |inserts, deletes, dlqs|
                deletes << delete
            end
            @mutex.synchronize { @locks.delete message[:id] }
        end
    rescue Exception=>error
        # Because errors do happen.
        @mutex.synchronize { @locks.delete message[:id] }
        raise error
    end

    # To prevent a transaction from modifying a message and then returning it to the
    # queue by aborting, we instead clone the message by de-serializing (this happens
    # in Queue, see there). The headers are also cloned (shallow, all values are frozen).
    return :id=>message[:id], :headers=>message[:headers].clone, :message=>message[:message]
end

#list(args) ⇒ Object

Called by client to list queue headers.

Raises:

  • (ArgumentError)


360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/reliable-msg/queue-manager.rb', line 360

def list args
    # Get the arguments of this call.
    queue = args[:queue].downcase
    raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty?

    return @mutex.synchronize do
        list = @store.get_headers queue
        now = Time.now.to_i
        list.inject([]) do |list, headers|
            if queue != Client::DLQ && ((headers[:expires_at] && headers[:expires_at] < now) || (headers[:redelivery] && headers[:redelivery] >= headers[:max_deliveries]))
                expired = {:id=>headers[:id], :queue=>queue, :headers=>headers}
                if headers[:delivery] == :once || headers[:delivery] == :repeated
                    @store.transaction { |inserts, deletes, dlqs| dlqs << expired }
                else # :best_effort
                    @store.transaction { |inserts, deletes, dlqs| deletes << expired }
                end
            else
                # Need to clone headers (shallow, values are frozen) when passing in same process.
                list << headers.clone
            end
            list
        end
    end
end

#publish(args) ⇒ Object

Called by client to publish message.

Raises:

  • (ArgumentError)


476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
# File 'lib/reliable-msg/queue-manager.rb', line 476

def publish args
    # Get the arguments of this call.
    message, headers, topic, tid = args[:message], args[:headers], args[:topic].downcase, args[:tid]
    raise ArgumentError, ERROR_PUBLISH_MISSING_TOPIC unless topic and topic.instance_of?(String) and !topic.empty?
    time = Time.new.to_i
    id = args[:id] || UUID.new
    created = args[:created] || time

    # Validate and freeze the headers. The cloning ensures that the headers we hold in memory
    # are not modified by the caller. The validation ensures that the headers we hold in memory
    # can be persisted safely. Basic types like string and integer are allowed, but application types
    # may prevent us from restoring the index. Strings are cloned since strings may be replaced.
    headers = if headers
        copy = {}
        headers.each_pair do |name, value|
            raise ArgumentError, format(ERROR_INVALID_HEADER_NAME, name, name.class) unless name.instance_of?(Symbol)
            case value
            when String, Numeric, Symbol, true, false, nil
                copy[name] = value.freeze
            else
                raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, name, "a string, numeric, symbol, true/false or nil", value.class)
            end
        end
        copy
    else
        {}
    end

    # Set the message headers controlled by the topic.
    headers[:id] = id
    headers[:created] = time
    if expires_at = headers[:expires_at]
        raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, :expires_at, "an integer", expires_at.class) unless expires_at.is_a?(Integer)
    elsif expires = headers[:expires]
        raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, :expires, "an integer", expires.class) unless expires.is_a?(Integer)
        headers[:expires_at] = Time.now.to_i + expires if expires > 0
    end
    # Create an insertion record for the new message.
    insert = {:id=>id, :topic=>topic, :headers=>headers, :message=>message}
    if tid
        tx = @transactions[tid]
        raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
        tx[:inserts] << insert
    else
        @store.transaction do |inserts, deletes, dlqs|
            inserts << insert
        end
    end
end

#queue(args) ⇒ Object

Called by client to queue a message.

Raises:

  • (ArgumentError)


302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/reliable-msg/queue-manager.rb', line 302

def queue args
    # Get the arguments of this call.
    message, headers, queue, tid = args[:message], args[:headers], args[:queue].downcase, args[:tid]
    raise ArgumentError, ERROR_SEND_MISSING_QUEUE unless queue and queue.instance_of?(String) and !queue.empty?
    time = Time.new.to_i
    # TODO: change this to support the RM delivery protocol.
    id = args[:id] || UUID.new
    created = args[:created] || time

    # Validate and freeze the headers. The cloning ensures that the headers we hold in memory
    # are not modified by the caller. The validation ensures that the headers we hold in memory
    # can be persisted safely. Basic types like string and integer are allowed, but application types
    # may prevent us from restoring the index. Strings are cloned since strings may be replaced.
    headers = if headers
        copy = {}
        headers.each_pair do |name, value|
            raise ArgumentError, format(ERROR_INVALID_HEADER_NAME, name, name.class) unless name.instance_of?(Symbol)
            case value
                when String, Numeric, Symbol, true, false, nil
                    copy[name] = value.freeze
                else
                    raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, name, "a string, numeric, symbol, true/false or nil", value.class)
            end
        end
        copy
    else
        {}
    end

    # Set the message headers controlled by the queue.
    headers[:id] = id
    headers[:created] = time
    headers[:delivery] ||= :best_effort
    headers[:max_deliveries] = integer headers[:max_deliveries], 1, Queue::DEFAULT_MAX_DELIVERIES
    headers[:priority] = integer headers[:priority], 0, 0
    if expires_at = headers[:expires_at]
        raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, :expires_at, "an integer", expires_at.class) unless expires_at.is_a?(Integer)
    elsif expires = headers[:expires]
        raise ArgumentError, format(ERROR_INVALID_HEADER_VALUE, :expires, "an integer", expires.class) unless expires.is_a?(Integer)
        headers[:expires_at] = Time.now.to_i + expires if expires > 0
    end
    # Create an insertion record for the new message.
    insert = {:id=>id, :queue=>queue, :headers=>headers, :message=>message}
    if tid
        tx = @transactions[tid]
        raise RuntimeError, format(ERROR_NO_TRANSACTION, tid) unless tx
        tx[:inserts] << insert
    else
        @store.transaction do |inserts, deletes, dlqs|
            inserts << insert
        end
    end
    # Return the message identifier.
    id
end

#retrieve(args) ⇒ Object

Called by client to retrieve message from topic.

Raises:

  • (ArgumentError)


528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
# File 'lib/reliable-msg/queue-manager.rb', line 528

def retrieve args
    # Get the arguments of this call.
    seen, topic, selector, tid = args[:seen], args[:topic].downcase, args[:selector], args[:tid]
    id, headers = nil, nil
    raise ArgumentError, ERROR_RETRIEVE_MISSING_TOPIC unless topic and topic.instance_of?(String) and !topic.empty?

    # Very simple, we really only select one message and nothing to lock.
    message = @store.get_last topic, seen do |headers|
        case selector
        when nil
            true
        when Hash
            selector.all? { |name, value| headers[name] == value }
        else
            raise RuntimeError, "Internal error"
        end
    end
    # Nothing to do if no message found.
    return unless message

    # If the message has expired, we discard the message. This being the most recent
    # message on the topic, we simply return nil.
    headers = message[:headers]
    if (headers[:expires_at] && headers[:expires_at] < Time.now.to_i)
        expired = {:id=>message[:id], :topic=>topic, :headers=>headers}
        @store.transaction { |inserts, deletes, dlqs| deletes << expired }
        return nil
    end
    return :id=>message[:id], :headers=>message[:headers].clone, :message=>message[:message]
end

#startObject

Starts the QueueManager. This method will block until the QueueManager has successfully started, and raise an exception if the QueueManager fails to start or if another QueueManager was already started in this process.



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/reliable-msg/queue-manager.rb', line 216

def start
    @mutex.synchronize do
        return if @@active == self
        Thread.critical = true
        if @@active.nil?
            @@active = self
        else
            Thread.critical = false
            raise RuntimeError, ERROR_QM_STARTED
        end
        Thread.critical = false

        begin
            # Get the message store based on the configuration, or default store.
            @store = MessageStore::Base.configure(@config.store || Config::DEFAULT_STORE, @logger)
            @logger.info format(INFO_MESSAGE_STORE, @store.type)
            @store.activate

            # Get the DRb URI from the configuration, or use the default. Create a DRb server.
            drb = Config::DEFAULT_DRB
            drb.merge(@config.drb) if @config.drb
            drb_uri = "druby://localhost:#{drb['port']}"
            @drb_server = DRb::DRbServer.new drb_uri, self, :tcp_acl=>ACL.new(drb["acl"].split(" "), ACL::ALLOW_DENY)
            @logger.info format(INFO_ACCEPTING_DRB, drb_uri)

            # Create a background thread to stop timed-out transactions.
            @timeout_thread = Thread.new do
                begin
                    while true
                        time = Time.new.to_i
                        @transactions.each_pair do |tid, tx|
                            if tx[:timeout] <= time
                                begin
                                    @logger.warn format(WARN_TRANSACTION_TIMEOUT, tid)
                                    abort tid
                                rescue
                                end
                            end
                        end
                        sleep TX_TIMEOUT_CHECK_EVERY
                    end
                rescue Exception=>error
                    retry
                end
            end

            # Associate this queue manager with the local Queue class, instead of using DRb.
            Client.send :qm=, self
            nil
        rescue Exception=>error
            @@active = nil if @@active == self
            raise error
        end
    end
end

#stopObject

Stops the QueueManager. Once stopped, you can start the same QueueManager again, or start a different QueueManager.



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/reliable-msg/queue-manager.rb', line 275

def stop
    @mutex.synchronize do
        raise RuntimeError, ERROR_QM_NOT_STARTED unless @@active == self
        # Prevent transactions from timing out while we take down the server.
        @timeout_thread.terminate
        # Shutdown DRb server to prevent new requests from being processed.\
        Client.send :qm=, nil
        drb_uri = @drb_server.uri
        @drb_server.stop_service
        # Deactivate the message store.
        @store.deactivate
        @store = nil
        @drb_server = @store = @timeout_thread = nil
        @logger.info format(INFO_QM_STOPPED, drb_uri)
        @@active = nil
    end
    true
end