Method: Franz::Agg#initialize

Defined in:
lib/franz/agg.rb

#initialize(opts = {}) ⇒ Agg

Start a new Agg thread in the background.

Parameters:

  • opts (Hash) (defaults to: {})

    options for the aggregator

Options Hash (opts):

  • :input_config (InputConfig)

    shared Franz configuration

  • :tail_events (Queue) — default: []

    “input” queue from Tail

  • :agg_events (Queue) — default: []

    “output” queue

  • :flush_interval (Integer) — default: 5

    seconds between flushes

  • :seqs (Hash<Path,Fixnum>) — default: {}

    internal “seqs” state

  • :logger (Logger) — default: Logger.new(STDOUT)

    logger to use



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/franz/agg.rb', line 29

def initialize opts={}
  @ic = opts[:input_config] || raise('No input_config specified')

  @tail_events = opts[:tail_events] || []
  @agg_events  = opts[:agg_events]  || []

  @buffer_limit   = opts[:buffer_limit]   || 50
  @flush_interval = opts[:flush_interval] || 10
  @seqs           = opts[:seqs]           || Hash.new
  @logger         = opts[:logger]         || Logger.new(STDOUT)

  @lock   = Hash.new { |h,k| h[k] = Mutex.new }
  @buffer = Franz::Sash.new
  @stop   = false

  @statz = opts[:statz] || Franz::Stats.new
  @statz.create :num_lines, 0

  @t1 = Thread.new do
    until @stop
      flush
      sleep flush_interval
    end
    flush true
  end

  @t2 = Thread.new do
    capture until @stop
  end

  log.debug \
    event: 'agg started',
    tail_events: @tail_events,
    agg_events: @agg_events
end