Class: Leveret::Queue
- Inherits:
-
Object
- Object
- Leveret::Queue
- Extended by:
- Forwardable
- Defined in:
- lib/leveret/queue.rb
Overview
Facilitates the publishing or subscribing of messages to the message queue.
Constant Summary collapse
- PRIORITY_MAP =
Map the symbol names for priorities to the integers that RabbitMQ requires.
{ low: 0, normal: 1, high: 2 }.freeze
Instance Attribute Summary collapse
-
#name ⇒ String
readonly
Name of the queue.
-
#queue ⇒ Bunny::Queue
readonly
The backend RabbitMQ queue.
Instance Method Summary collapse
-
#initialize(name = nil) ⇒ Queue
constructor
Create a new queue with the name given in the params, if no name is given it will default to Configuration#default_queue_name.
-
#publish(payload, options = {}) ⇒ void
Publish a mesage onto the queue.
-
#subscribe {|incoming| ... } ⇒ void
Subscribe to this queue and yield a block for every message received.
Constructor Details
#initialize(name = nil) ⇒ Queue
Create a new queue with the name given in the params, if no name is given it will default to Configuration#default_queue_name. On instantiation constructor will immedaitely connect to RabbitMQ backend and create a queue with the appropriate name, or join an existing one.
25 26 27 28 |
# File 'lib/leveret/queue.rb', line 25 def initialize(name = nil) @name = name || Leveret.configuration.default_queue_name @queue = connect_to_queue end |
Instance Attribute Details
#name ⇒ String (readonly)
Returns Name of the queue. This will have Leveret.queue_name_prefix prepended to it when creating a corresponding queue in RabbitMQ.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/leveret/queue.rb', line 10 class Queue extend Forwardable # Map the symbol names for priorities to the integers that RabbitMQ requires. PRIORITY_MAP = { low: 0, normal: 1, high: 2 }.freeze attr_reader :name, :queue def_delegators :Leveret, :exchange, :channel, :log def_delegators :queue, :pop, :purge # Create a new queue with the name given in the params, if no name is given it will default to # {Configuration#default_queue_name}. On instantiation constructor will immedaitely connect to # RabbitMQ backend and create a queue with the appropriate name, or join an existing one. # # @param [String] name Name of the queue to connect to. def initialize(name = nil) @name = name || Leveret.configuration.default_queue_name @queue = connect_to_queue end # Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until # the message is definitely on the queue. # # @param [Hash] payload The data we wish to send onto the queue, this will be serialized and automatically # deserialized when received by a {#subscribe} block. # @option options [Symbol] :priority (:normal) The priority this message should be treated with on the queue # see {PRIORITY_MAP} for available options. # # @return [void] def publish(payload, = {}) priority_id = PRIORITY_MAP[[:priority]] || PRIORITY_MAP[:normal] payload = serialize_payload(payload) log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})" exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id) end # Subscribe to this queue and yield a block for every message received. This method does not block, receiving and # dispatching of messages will be handled in a separate thread. # # The receiving block is responsible for acknowledging or rejecting the message. This must be done using the # same channel the message was received # on, {#Leveret.channel}. {Worker#ack_message} provides an example # implementation of this acknowledgement. # # @note The receiving block is responsible for acking/rejecting the message. Please see the note for more details. # # @yieldparam incoming [Message] Delivery info, properties and the params wrapped up into a convenient object # # @return [void] def subscribe log.info "Subscribing to #{name}" queue.subscribe(manual_ack: true) do |delivery_info, properties, msg| log.debug "Received #{msg} from #{name}" incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg)) yield incoming end end private # Convert a set of parameters passed into a serialized form suitable for transport on the message queue # # @param [Hash] Paramets to be serialized # # @return [String] Encoded params ready to be sent onto the queue def serialize_payload(params) Leveret::Parameters.new(params).serialize end # Convert a set of serialized parameters into a {Parameters} object # # @param [String] JSON representation of the parameters # # @return [Parameters] Useful object representation of the parameters def deserialize_payload(json) Leveret::Parameters.from_json(json) end # Create or return a representation of the queue on the RabbitMQ backend # # @return [Bunny::Queue] RabbitMQ queue def connect_to_queue queue = channel.queue(mq_name, durable: true, auto_delete: false, arguments: { 'x-max-priority' => 2 }) queue.bind(exchange, routing_key: name) log.debug "Connected to #{mq_name}, bound on #{name}" queue end # Calculate the name of the queue that should be used on the RabbitMQ backend # # @return [String] Backend queue name def mq_name @mq_name ||= [Leveret.configuration.queue_name_prefix, name].join('_') end end |
#queue ⇒ Bunny::Queue (readonly)
Returns The backend RabbitMQ queue.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/leveret/queue.rb', line 10 class Queue extend Forwardable # Map the symbol names for priorities to the integers that RabbitMQ requires. PRIORITY_MAP = { low: 0, normal: 1, high: 2 }.freeze attr_reader :name, :queue def_delegators :Leveret, :exchange, :channel, :log def_delegators :queue, :pop, :purge # Create a new queue with the name given in the params, if no name is given it will default to # {Configuration#default_queue_name}. On instantiation constructor will immedaitely connect to # RabbitMQ backend and create a queue with the appropriate name, or join an existing one. # # @param [String] name Name of the queue to connect to. def initialize(name = nil) @name = name || Leveret.configuration.default_queue_name @queue = connect_to_queue end # Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until # the message is definitely on the queue. # # @param [Hash] payload The data we wish to send onto the queue, this will be serialized and automatically # deserialized when received by a {#subscribe} block. # @option options [Symbol] :priority (:normal) The priority this message should be treated with on the queue # see {PRIORITY_MAP} for available options. # # @return [void] def publish(payload, = {}) priority_id = PRIORITY_MAP[[:priority]] || PRIORITY_MAP[:normal] payload = serialize_payload(payload) log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})" exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id) end # Subscribe to this queue and yield a block for every message received. This method does not block, receiving and # dispatching of messages will be handled in a separate thread. # # The receiving block is responsible for acknowledging or rejecting the message. This must be done using the # same channel the message was received # on, {#Leveret.channel}. {Worker#ack_message} provides an example # implementation of this acknowledgement. # # @note The receiving block is responsible for acking/rejecting the message. Please see the note for more details. # # @yieldparam incoming [Message] Delivery info, properties and the params wrapped up into a convenient object # # @return [void] def subscribe log.info "Subscribing to #{name}" queue.subscribe(manual_ack: true) do |delivery_info, properties, msg| log.debug "Received #{msg} from #{name}" incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg)) yield incoming end end private # Convert a set of parameters passed into a serialized form suitable for transport on the message queue # # @param [Hash] Paramets to be serialized # # @return [String] Encoded params ready to be sent onto the queue def serialize_payload(params) Leveret::Parameters.new(params).serialize end # Convert a set of serialized parameters into a {Parameters} object # # @param [String] JSON representation of the parameters # # @return [Parameters] Useful object representation of the parameters def deserialize_payload(json) Leveret::Parameters.from_json(json) end # Create or return a representation of the queue on the RabbitMQ backend # # @return [Bunny::Queue] RabbitMQ queue def connect_to_queue queue = channel.queue(mq_name, durable: true, auto_delete: false, arguments: { 'x-max-priority' => 2 }) queue.bind(exchange, routing_key: name) log.debug "Connected to #{mq_name}, bound on #{name}" queue end # Calculate the name of the queue that should be used on the RabbitMQ backend # # @return [String] Backend queue name def mq_name @mq_name ||= [Leveret.configuration.queue_name_prefix, name].join('_') end end |
Instance Method Details
#publish(payload, options = {}) ⇒ void
This method returns an undefined value.
Publish a mesage onto the queue. Fire and forget, this method is non-blocking and will not wait until the message is definitely on the queue.
39 40 41 42 43 44 45 |
# File 'lib/leveret/queue.rb', line 39 def publish(payload, = {}) priority_id = PRIORITY_MAP[[:priority]] || PRIORITY_MAP[:normal] payload = serialize_payload(payload) log.debug "Publishing #{payload.inspect} for queue #{name} (Priority: #{priority_id})" exchange.publish(payload, persistent: true, routing_key: name, priority: priority_id) end |
#subscribe {|incoming| ... } ⇒ void
The receiving block is responsible for acking/rejecting the message. Please see the note for more details.
This method returns an undefined value.
Subscribe to this queue and yield a block for every message received. This method does not block, receiving and dispatching of messages will be handled in a separate thread.
The receiving block is responsible for acknowledging or rejecting the message. This must be done using the same channel the message was received # on, Leveret::Queue#Leveret#Leveret.channel. Worker#ack_message provides an example implementation of this acknowledgement.
59 60 61 62 63 64 65 66 |
# File 'lib/leveret/queue.rb', line 59 def subscribe log.info "Subscribing to #{name}" queue.subscribe(manual_ack: true) do |delivery_info, properties, msg| log.debug "Received #{msg} from #{name}" incoming = Leveret::Message.new(delivery_info, properties, deserialize_payload(msg)) yield incoming end end |