Class: Deimos::Utils::InlineConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/deimos/utils/inline_consumer.rb

Overview

Class which can process/consume messages inline.

Constant Summary collapse

MAX_MESSAGE_WAIT_TIME =
1.second
MAX_TOPIC_WAIT_TIME =
10.seconds

Class Method Summary collapse

Class Method Details

.consume(topic:, frk_consumer:, num_messages: 10) ⇒ Object

Consume the last X messages from a topic.

Parameters:

  • topic (String)
  • frk_consumer (Class)
  • num_messages (Integer) (defaults to: 10)

    If this number is >= the number of messages in the topic, all messages will be consumed.



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/deimos/utils/inline_consumer.rb', line 109

def self.consume(topic:, frk_consumer:, num_messages: 10)
  listener = SeekListener.new(
    handler: frk_consumer,
    group_id: SecureRandom.hex,
    topic: topic,
    heartbeat_interval: 1
  )
  listener.num_messages = num_messages

  # Add the start_time and last_message_time attributes to the
  # consumer class so we can kill it if it's gone on too long
  class << frk_consumer
    attr_accessor :start_time, :last_message_time
  end

  subscribers = []
  subscribers << ActiveSupport::Notifications.
    subscribe('phobos.listener.process_message') do
      frk_consumer.last_message_time = Time.zone.now
    end
  subscribers << ActiveSupport::Notifications.
    subscribe('phobos.listener.start_handler') do
      frk_consumer.start_time = Time.zone.now
      frk_consumer.last_message_time = nil
    end
  subscribers << ActiveSupport::Notifications.
    subscribe('heartbeat.consumer.kafka') do
      if frk_consumer.last_message_time
        if Time.zone.now - frk_consumer.last_message_time > MAX_MESSAGE_WAIT_TIME
          raise Phobos::AbortError
        end
      elsif Time.zone.now - frk_consumer.start_time > MAX_TOPIC_WAIT_TIME
        Deimos.config.logger.error('Aborting - initial wait too long')
        raise Phobos::AbortError
      end
    end
  listener.start
  subscribers.each { |s| ActiveSupport::Notifications.unsubscribe(s) }
end

.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) ⇒ Array<Hash>

Get the last X messages from a topic. You can specify a subclass of Deimos::Consumer or Deimos::Producer, or provide the schema, namespace and key_config directly.

Parameters:

  • topic (String)
  • config_class (Class < Deimos::Consumer|Deimos::Producer>) (defaults to: nil)
  • schema (String) (defaults to: nil)
  • namespace (String) (defaults to: nil)
  • key_config (Hash) (defaults to: nil)
  • num_messages (Number) (defaults to: 10)

Returns:

  • (Array<Hash>)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/deimos/utils/inline_consumer.rb', line 82

def self.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil,
                          config_class: nil, num_messages: 10)
  if config_class
    MessageBankHandler.config_class = config_class
  elsif schema.nil? || key_config.nil?
    raise 'You must specify either a config_class or a schema, namespace and key_config!'
  else
    MessageBankHandler.class_eval do
      schema schema
      namespace namespace
      key_config key_config
      @decoder = nil
      @key_decoder = nil
    end
  end
  self.consume(topic: topic,
               frk_consumer: MessageBankHandler,
               num_messages: num_messages)
  messages = MessageBankHandler.total_messages
  messages.size <= num_messages ? messages : messages[-num_messages..-1]
end