Class: Leveret::Queue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/leveret/queue.rb

Overview

Facilitates the publishing or subscribing of messages to the message queue.

Constant Summary collapse

PRIORITY_MAP =

Map the symbol names for priorities to the integers that RabbitMQ requires.

{ low: 0, normal: 1, high: 2 }.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name = nil) ⇒ Queue

Create a new queue with the name given in the params, if no name is given it will default to Configuration#default_queue_name. On instantiation constructor will immedaitely connect to RabbitMQ backend and create a queue with the appropriate name, or join an existing one.

Parameters:

  • name (String) (defaults to: nil)

    Name of the queue to connect to.



25
26
27
28
# File 'lib/leveret/queue.rb', line 25

def initialize(name = nil)
  @name = name || Leveret.configuration.default_queue_name
  @queue = connect_to_queue
end

Instance Attribute Details

#nameString (readonly)

Returns Name of the queue. This will have Leveret.queue_name_prefix prepended to it when creating a corresponding queue in RabbitMQ.

Returns:

  • (String)

    Name of the queue. This will have Leveret.queue_name_prefix prepended to it when creating a corresponding queue in RabbitMQ.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/leveret/queue.rb', line 10

class Queue
  extend Forwardable

  # Map the symbol names for priorities to the integers that RabbitMQ requires.
  PRIORITY_MAP = { low: 0, normal: 1, high: 2 }.freeze
  attr_reader :name, :queue

  def_delegators :Leveret, :exchange, :channel, :log
  def_delegators :queue, :pop, :purge

  # Create a new queue with the name given in the params, if no name is given it will default to
  # {Configuration#default_queue_name}. On instantiation constructor will immedaitely connect to
  # RabbitMQ backend and create a queue with the appropriate name, or join an existing one.
  #
  # @param [String] name Name of the queue to connect to.
  def initialize(name = nil)
    @name = name || Leveret.configuration.default_queue_name
    @queue = connect_to_queue
  end

  # Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until
  # the message is definitely on the queue.
  #
  # @param [Hash] payload The data we wish to send onto the queue, this will be serialized and automatically
  #   deserialized when received by a {#subscribe} block.
  # @option options [Symbol] :priority (:normal) The priority this message should be treated with on the queue
  #   see {PRIORITY_MAP} for available options.
  #
  # @return [void]
  def publish(payload, options = {})
    priority_id = PRIORITY_MAP[options[:priority]] || PRIORITY_MAP[:normal]
    payload = serialize_payload(payload)

    log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})"
    exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id)
  end

  # Subscribe to this queue and yield a block for every message received. This method does not block, receiving and
  # dispatching of messages will be handled in a separate thread.
  #
  # The receiving block is responsible for acknowledging or rejecting the message. This must be done using the
  # same channel the message was received # on, {#Leveret.channel}. {Worker#ack_message} provides an example
  # implementation of this acknowledgement.
  #
  # @note The receiving block is responsible for acking/rejecting the message. Please see the note for more details.
  #
  # @yieldparam incoming [Message] Delivery info, properties and the params wrapped up into a convenient object
  #
  # @return [void]
  def subscribe
    log.info "Subscribing to #{name}"
    queue.subscribe(manual_ack: true) do |delivery_info, properties, msg|
      log.debug "Received #{msg} from #{name}"
      incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg))
      yield incoming
    end
  end

  private

  # Convert a set of parameters passed into a serialized form suitable for transport on the message queue
  #
  # @param [Hash] Paramets to be serialized
  #
  # @return [String] Encoded params ready to be sent onto the queue
  def serialize_payload(params)
    Leveret::Parameters.new(params).serialize
  end

  # Convert a set of serialized parameters into a {Parameters} object
  #
  # @param [String] JSON representation of the parameters
  #
  # @return [Parameters] Useful object representation of the parameters
  def deserialize_payload(json)
    Leveret::Parameters.from_json(json)
  end

  # Create or return a representation of the queue on the RabbitMQ backend
  #
  # @return [Bunny::Queue] RabbitMQ queue
  def connect_to_queue
    queue = channel.queue(mq_name, durable: true, auto_delete: false, arguments: { 'x-max-priority' => 2 })
    queue.bind(exchange, routing_key: name)
    log.debug "Connected to #{mq_name}, bound on #{name}"
    queue
  end

  # Calculate the name of the queue that should be used on the RabbitMQ backend
  #
  # @return [String] Backend queue name
  def mq_name
    @mq_name ||= [Leveret.configuration.queue_name_prefix, name].join('_')
  end
end

#queueBunny::Queue (readonly)

Returns The backend RabbitMQ queue.

Returns:

  • (Bunny::Queue)

    The backend RabbitMQ queue

See Also:



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/leveret/queue.rb', line 10

class Queue
  extend Forwardable

  # Map the symbol names for priorities to the integers that RabbitMQ requires.
  PRIORITY_MAP = { low: 0, normal: 1, high: 2 }.freeze
  attr_reader :name, :queue

  def_delegators :Leveret, :exchange, :channel, :log
  def_delegators :queue, :pop, :purge

  # Create a new queue with the name given in the params, if no name is given it will default to
  # {Configuration#default_queue_name}. On instantiation constructor will immedaitely connect to
  # RabbitMQ backend and create a queue with the appropriate name, or join an existing one.
  #
  # @param [String] name Name of the queue to connect to.
  def initialize(name = nil)
    @name = name || Leveret.configuration.default_queue_name
    @queue = connect_to_queue
  end

  # Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until
  # the message is definitely on the queue.
  #
  # @param [Hash] payload The data we wish to send onto the queue, this will be serialized and automatically
  #   deserialized when received by a {#subscribe} block.
  # @option options [Symbol] :priority (:normal) The priority this message should be treated with on the queue
  #   see {PRIORITY_MAP} for available options.
  #
  # @return [void]
  def publish(payload, options = {})
    priority_id = PRIORITY_MAP[options[:priority]] || PRIORITY_MAP[:normal]
    payload = serialize_payload(payload)

    log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})"
    exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id)
  end

  # Subscribe to this queue and yield a block for every message received. This method does not block, receiving and
  # dispatching of messages will be handled in a separate thread.
  #
  # The receiving block is responsible for acknowledging or rejecting the message. This must be done using the
  # same channel the message was received # on, {#Leveret.channel}. {Worker#ack_message} provides an example
  # implementation of this acknowledgement.
  #
  # @note The receiving block is responsible for acking/rejecting the message. Please see the note for more details.
  #
  # @yieldparam incoming [Message] Delivery info, properties and the params wrapped up into a convenient object
  #
  # @return [void]
  def subscribe
    log.info "Subscribing to #{name}"
    queue.subscribe(manual_ack: true) do |delivery_info, properties, msg|
      log.debug "Received #{msg} from #{name}"
      incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg))
      yield incoming
    end
  end

  private

  # Convert a set of parameters passed into a serialized form suitable for transport on the message queue
  #
  # @param [Hash] Paramets to be serialized
  #
  # @return [String] Encoded params ready to be sent onto the queue
  def serialize_payload(params)
    Leveret::Parameters.new(params).serialize
  end

  # Convert a set of serialized parameters into a {Parameters} object
  #
  # @param [String] JSON representation of the parameters
  #
  # @return [Parameters] Useful object representation of the parameters
  def deserialize_payload(json)
    Leveret::Parameters.from_json(json)
  end

  # Create or return a representation of the queue on the RabbitMQ backend
  #
  # @return [Bunny::Queue] RabbitMQ queue
  def connect_to_queue
    queue = channel.queue(mq_name, durable: true, auto_delete: false, arguments: { 'x-max-priority' => 2 })
    queue.bind(exchange, routing_key: name)
    log.debug "Connected to #{mq_name}, bound on #{name}"
    queue
  end

  # Calculate the name of the queue that should be used on the RabbitMQ backend
  #
  # @return [String] Backend queue name
  def mq_name
    @mq_name ||= [Leveret.configuration.queue_name_prefix, name].join('_')
  end
end

Instance Method Details

#publish(payload, options = {}) ⇒ void

This method returns an undefined value.

Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until the message is definitely on the queue.

Parameters:

  • payload (Hash)

    The data we wish to send onto the queue, this will be serialized and automatically deserialized when received by a #subscribe block.

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :priority (Symbol) — default: :normal

    The priority this message should be treated with on the queue see PRIORITY_MAP for available options.



39
40
41
42
43
44
45
# File 'lib/leveret/queue.rb', line 39

def publish(payload, options = {})
  priority_id = PRIORITY_MAP[options[:priority]] || PRIORITY_MAP[:normal]
  payload = serialize_payload(payload)

  log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})"
  exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id)
end

#subscribe {|incoming| ... } ⇒ void

Note:

The receiving block is responsible for acking/rejecting the message. Please see the note for more details.

This method returns an undefined value.

Subscribe to this queue and yield a block for every message received. This method does not block, receiving and dispatching of messages will be handled in a separate thread.

The receiving block is responsible for acknowledging or rejecting the message. This must be done using the same channel the message was received # on, Leveret::Queue#Leveret#Leveret.channel. Worker#ack_message provides an example implementation of this acknowledgement.

Yield Parameters:

  • incoming (Message)

    Delivery info, properties and the params wrapped up into a convenient object



59
60
61
62
63
64
65
66
# File 'lib/leveret/queue.rb', line 59

def subscribe
  log.info "Subscribing to #{name}"
  queue.subscribe(manual_ack: true) do |delivery_info, properties, msg|
    log.debug "Received #{msg} from #{name}"
    incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg))
    yield incoming
  end
end