Class: RabbitMQ::Actors::Worker Abstract

Inherits:
Base::Consumer show all
Defined in:
lib/rabbitmq/actors/patterns/master_workers/worker.rb

Overview

This class is abstract.

Subclass and override #perform to define your customized worker class.

A consumer of messages from RabbitMQ based on queue name.

Examples:

class MyListener < RabbitMQ::Actors::Worker
  class_attribute :queue_name, instance_writer: false

  def initialize
    super(queue_name:      queue_name,
          manual_ack:      true
          logger:          Rails.logger,
          on_cancellation: ->{ ActiveRecord::Base.connection.close }
  end

private

  def perform(**task)
    answer, transaction_guid = JSON.parse(task[:body]), task[:properties][:correlation_id]
    transaction = referred_transaction(transaction_guid: transaction_guid, tid: answer['tid'])
    return _no_transaction!(message_id: transaction_guid, transaction: transaction) if transaction.errors.present?
    update_transaction(transaction, answer.symbolize_keys)
  end

  def referred_transaction(transaction_guid:, tid:)
    ...
  end

  def update_transaction(purchase, **attrs)
    ...
  end

  def _no_transaction!(message_id:, transaction:)
    log_event(type:        'Transaction',
              message_id:  message_id,
              description: 'ERROR: answer does not identify any transaction',
              payload:     { errors: transaction.errors.full_messages },
              severity:    :high)
    transaction
  end
end

RabbitMQ::Server.url = 'amqp://localhost'

MyListener.queue_name = "transactions"
MyListener.new.start!

Instance Attribute Summary

Attributes inherited from Base::Agent

#queue

Instance Method Summary collapse

Methods inherited from Base::Consumer

#start!

Constructor Details

#initialize(queue_name:, **opts) ⇒ Worker

Rest of options required by your subclass.

Parameters:

  • :queue_name (String)

    name of the durable queue where to receive messages from.

  • opts (Hash)

    a customizable set of options

Options Hash (**opts):

  • :manual_ack (Boolean)

    to acknowledge messages to the RabbitMQ server once executed.

  • :on_cancellation (Proc)

    to be executed before the worker is terminated

  • :logger (Logger)

    the logger where to output info about this agent’s activity.



58
59
60
# File 'lib/rabbitmq/actors/patterns/master_workers/worker.rb', line 58

def initialize(queue_name:, **opts)
  super(opts.merge(queue_name: queue_name, exclusive: false))
end