Class: Franz::Agg

Inherits:
Object
  • Object
show all
Defined in:
lib/franz/agg.rb

Overview

Agg mostly aggregates Tail events by applying the multiline filter, but it also applies the “host” and “type” fields. Basically, it does all the post- processing after we’ve retreived a line from a file.

Constant Summary collapse

@@host =

We’ll apply the hostname to all events

Socket.gethostname

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Agg

Start a new Agg thread in the background.

Parameters:

  • opts (Hash) (defaults to: {})

    options for the aggregator

Options Hash (opts):

  • :input_config (InputConfig)

    shared Franz configuration

  • :tail_events (Queue) — default: []

    “input” queue from Tail

  • :agg_events (Queue) — default: []

    “output” queue

  • :flush_interval (Integer) — default: 5

    seconds between flushes

  • :seqs (Hash<Path,Fixnum>) — default: {}

    internal “seqs” state

  • :logger (Logger) — default: Logger.new(STDOUT)

    logger to use



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/franz/agg.rb', line 29

def initialize opts={}
  @ic = opts[:input_config] || raise('No input_config specified')

  @tail_events = opts[:tail_events] || []
  @agg_events  = opts[:agg_events]  || []

  @buffer_limit   = opts[:buffer_limit]   || 50
  @flush_interval = opts[:flush_interval] || 10
  @seqs           = opts[:seqs]           || Hash.new
  @logger         = opts[:logger]         || Logger.new(STDOUT)

  @lock   = Hash.new { |h,k| h[k] = Mutex.new }
  @buffer = Franz::Sash.new
  @stop   = false

  @statz = opts[:statz] || Franz::Stats.new
  @statz.create :num_lines, 0

  @t1 = Thread.new do
    until @stop
      flush
      sleep flush_interval
    end
    flush true
  end

  @t2 = Thread.new do
    capture until @stop
  end

  log.debug \
    event: 'agg started',
    tail_events: @tail_events,
    agg_events: @agg_events
end

Instance Attribute Details

#seqsObject (readonly)

Returns the value of attribute seqs.



18
19
20
# File 'lib/franz/agg.rb', line 18

def seqs
  @seqs
end

Instance Method Details

#stateObject

Return the internal “seqs” state



78
79
80
# File 'lib/franz/agg.rb', line 78

def state
  return @seqs.dup
end

#stopHash

Stop the Agg thread. Effectively only once.

Returns:

  • (Hash)

    internal “seqs” state



68
69
70
71
72
73
74
75
# File 'lib/franz/agg.rb', line 68

def stop
  return state if @stop
  @stop = true
  @t2.kill
  @t1.join
  log.debug event: 'agg stopped'
  return state
end