Module: Pigeon

Defined in:
lib/pigeon.rb,
lib/pigeon/version.rb,
lib/pigeon/processor.rb,
lib/pigeon/publisher.rb,
lib/pigeon/configuration.rb,
lib/pigeon/models/outbox_message.rb,
lib/pigeon/models/adapters/rom_adapter.rb,
lib/pigeon/generators/rails/migration_generator.rb,
lib/pigeon/generators/hanami/migration_generator.rb,
lib/pigeon/models/adapters/active_record_adapter.rb

Defined Under Namespace

Modules: Generators, Models Classes: Configuration, Error, MockProducer, Processor, Publisher

Constant Summary collapse

VERSION =
"0.1.0"

Class Method Summary collapse

Class Method Details

.configDry::Configurable::Config

Get the configuration

Returns:

  • (Dry::Configurable::Config)


30
31
32
# File 'lib/pigeon.rb', line 30

def self.config
  Configuration.config
end

.configure {|config| ... } ⇒ Object

Configure the gem

Examples:

Pigeon.configure do |config|
  config.client_id = "my-application"
  config.kafka_brokers = ["kafka1:9092", "kafka2:9092"]
  config.max_retries = 5
end

Yields:

  • (config)

    Configuration instance



23
24
25
26
# File 'lib/pigeon.rb', line 23

def self.configure
  yield(Configuration.config) if block_given?
  initialize_karafka if @karafka_initialized.nil?
end

.create_outbox_message(attributes = {}) ⇒ Pigeon::Models::OutboxMessage

Create a new outbox message

Parameters:

  • attributes (Hash) (defaults to: {})

    Message attributes

Returns:



118
119
120
# File 'lib/pigeon.rb', line 118

def self.create_outbox_message(attributes = {})
  outbox_message_adapter.create(attributes)
end

.find_outbox_message(id) ⇒ Pigeon::Models::OutboxMessage?

Find an outbox message by ID

Parameters:

  • id (String, Integer)

    Message ID

Returns:



125
126
127
# File 'lib/pigeon.rb', line 125

def self.find_outbox_message(id)
  outbox_message_adapter.find(id)
end

.find_outbox_messages_by_status(status, limit = 100) ⇒ Array<Pigeon::Models::OutboxMessage>

Find outbox messages by status

Parameters:

  • status (String)

    Message status

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:



133
134
135
# File 'lib/pigeon.rb', line 133

def self.find_outbox_messages_by_status(status, limit = 100)
  outbox_message_adapter.find_by_status(status, limit)
end

.find_outbox_messages_ready_for_retry(limit = 100) ⇒ Array<Pigeon::Models::OutboxMessage>

Find outbox messages ready for retry

Parameters:

  • limit (Integer) (defaults to: 100)

    Maximum number of messages to return

Returns:



140
141
142
# File 'lib/pigeon.rb', line 140

def self.find_outbox_messages_ready_for_retry(limit = 100)
  outbox_message_adapter.find_ready_for_retry(limit)
end

.initialize_karafkaKarafka::Producer

Initialize the Karafka producer

Returns:

  • (Karafka::Producer)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/pigeon.rb', line 36

def self.initialize_karafka # rubocop:disable Metrics/AbcSize
  return @karafka_producer if @karafka_initialized

  # Configure Karafka
  begin
    Karafka::Setup::Config.setup do |karafka_config|
      karafka_config.client_id = config.client_id

      # Set required Kafka configuration if not provided
      if !config.karafka_config[:kafka] || config.karafka_config[:kafka].empty?
        karafka_config.kafka = {
          "bootstrap.servers": config.kafka_brokers.join(",")
        }
      end

      # Apply any additional Karafka configuration
      config.karafka_config.each do |key, value|
        karafka_config.public_send("#{key}=", value) if karafka_config.respond_to?("#{key}=")
      end
    end

    @karafka_initialized = true
    @karafka_producer = Karafka.producer
  rescue StandardError => e
    config.logger.error("Failed to initialize Karafka: #{e.message}")
    # Return a mock producer for testing
    @karafka_initialized = true
    @karafka_producer = MockProducer.new
  end

  @karafka_producer
end

.karafka_producerKarafka::Producer

Get the Karafka producer instance

Returns:

  • (Karafka::Producer)


86
87
88
89
# File 'lib/pigeon.rb', line 86

def self.karafka_producer
  initialize_karafka unless @karafka_initialized
  @karafka_producer
end

.outbox_message_adapterClass

Get the appropriate outbox message adapter based on the framework

Returns:

  • (Class)

    Adapter class



105
106
107
108
109
110
111
112
113
# File 'lib/pigeon.rb', line 105

def self.outbox_message_adapter
  if defined?(ActiveRecord)
    Models::Adapters::ActiveRecordAdapter
  elsif defined?(ROM) && defined?(Hanami)
    Models::Adapters::RomAdapter
  else
    Models::OutboxMessage
  end
end

.processorPigeon::Processor

Create a new processor instance

Returns:



99
100
101
# File 'lib/pigeon.rb', line 99

def self.processor
  Processor.new
end

.publisherPigeon::Publisher

Create a new publisher instance

Returns:



93
94
95
# File 'lib/pigeon.rb', line 93

def self.publisher
  Publisher.new
end