Method: Franz::Agg#initialize
- Defined in:
- lib/franz/agg.rb
#initialize(opts = {}) ⇒ Agg
Start a new Agg thread in the background.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/franz/agg.rb', line 29 def initialize opts={} @ic = opts[:input_config] || raise('No input_config specified') @tail_events = opts[:tail_events] || [] @agg_events = opts[:agg_events] || [] @buffer_limit = opts[:buffer_limit] || 50 @flush_interval = opts[:flush_interval] || 10 @seqs = opts[:seqs] || Hash.new @logger = opts[:logger] || Logger.new(STDOUT) @lock = Hash.new { |h,k| h[k] = Mutex.new } @buffer = Franz::Sash.new @stop = false @statz = opts[:statz] || Franz::Stats.new @statz.create :num_lines, 0 @t1 = Thread.new do until @stop flush sleep flush_interval end flush true end @t2 = Thread.new do capture until @stop end log.debug \ event: 'agg started', tail_events: @tail_events, agg_events: @agg_events end |