Class: RabbitMQ::Actors::HeadersConsumer Abstract

Inherits:
Base::Consumer show all
Defined in:
lib/rabbitmq/actors/patterns/headers/headers_consumer.rb

Overview

This class is abstract.

Subclass and override #perform to define your customized headers worker class.

A consumer of messages from RabbitMQ based on exchange and message headers matching.

Examples:

class NewYorkBranchListener < RabbitMQ::Actors::HeadersConsumer
  def initialize
    super(headers_name:    'reports',
          binding_headers: { 'type' => :econony, 'area' => 'Usa', 'x-match' => 'any' },
          logger:          Rails.logger,
          on_cancellation: ->{ ActiveRecord::Base.connection.close })
  end

  private

  def perform(**task)
    report_data = JSON.parse(task[:body])
    process_report(report_data)
  end

  def process_report(data)
    ...
  end
end

class LondonBranchListener < RabbitMQ::Actors::HeadersConsumer
  def initialize
    super(headers_name:    'reports',
          binding_headers: { 'type' => :industry, 'area' => 'Europe', 'x-match' =>'any' },
          logger:          Rails.logger,
          on_cancellation: ->{ ActiveRecord::Base.connection.close })
  end

  private

  def perform(**task)
    report_data = JSON.parse(task[:body])
    process_report(report_data)
  end

  def process_report(data)
    ...
  end
end

RabbitMQ::Server.url = 'amqp://localhost'

NewYorkBranchListener.new.start!
LondonBranchListener.new.start!

Instance Attribute Summary collapse

Attributes inherited from Base::Agent

#queue

Instance Method Summary collapse

Methods inherited from Base::Consumer

#start!

Constructor Details

#initialize(headers_name:, binding_headers:, **opts) ⇒ HeadersConsumer

Rest of options required by your subclass.

Parameters:

  • :headers_name (String)

    name of the headers exchange this worker will receive messages.

  • :binding_headers (Hash)

    headers this worker is interested in. Default to all: ‘#’

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :on_cancellation (Proc)

    to be executed before the worker is terminated

  • :logger (Logger)

    the logger where to output info about this agent’s activity.



72
73
74
# File 'lib/rabbitmq/actors/patterns/headers/headers_consumer.rb', line 72

def initialize(headers_name:, binding_headers:, **opts)
  super(opts.merge(headers_name: headers_name, binding_headers: binding_headers))
end

Instance Attribute Details

#binding_headersObject (readonly)

Returns the value of attribute binding_headers.



64
65
66
# File 'lib/rabbitmq/actors/patterns/headers/headers_consumer.rb', line 64

def binding_headers
  @binding_headers
end

#headers_nameObject (readonly)

Returns the value of attribute headers_name.



57
58
59
# File 'lib/rabbitmq/actors/patterns/headers/headers_consumer.rb', line 57

def headers_name
  @headers_name
end