Class: Franz::Tail

Inherits:
Object
  • Object
show all
Defined in:
lib/franz/tail.rb

Overview

Tail receives low-level file events from a Watch and handles the actual reading of files, providing a stream of lines.

Constant Summary collapse

ERR_BUFFER_FULL =
1
ERR_INVALID_EVENT =
2
ERR_INCOMPLETE_READ =
3

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Tail

Start a new Tail thread in the background.

Parameters:

  • opts (Hash) (defaults to: {})

    a complex Hash for tail configuration

Options Hash (opts):

  • :input_config (InputConfig)

    shared Franz configuration



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/franz/tail.rb', line 21

def initialize opts={}
  @ic = opts[:input_config] || raise('No input_config specified')

  @watch_events = opts[:watch_events] || []
  @tail_events  = opts[:tail_events]  || []

  @read_limit = opts[:read_limit] || 10_240 # 100 KiB
  @line_limit = opts[:line_limit] || 10_240 # 10 KiB
  @block_size = opts[:block_size] || 32_768 # 32 KiB
  @cursors    = opts[:cursors]    || Hash.new
  @logger     = opts[:logger]     || Logger.new(STDOUT)

  @buffer = Hash.new { |h, k| h[k] = BufferedTokenizer.new("\n", @line_limit) }
  @stop   = false

  @tail_thread = Thread.new do
    handle(watch_events.shift) until @stop
  end

  log.debug \
    event: 'tail started',
    watch_events: watch_events,
    tail_events: tail_events,
    block_size: block_size
end

Instance Method Details

#stateObject

Return the internal “cursors” state



60
61
62
# File 'lib/franz/tail.rb', line 60

def state
  return @cursors.dup
end

#stopHash

Stop the Tail thread. Effectively only once.

Returns:

  • (Hash)

    internal “cursors” state



50
51
52
53
54
55
56
57
# File 'lib/franz/tail.rb', line 50

def stop
  return state if @stop
  @stop = true
  @watch_thread.kill rescue nil
  @tail_thread.kill  rescue nil
  log.debug event: 'tail stopped'
  return state
end