Class: Franz::Tail
- Inherits:
-
Object
- Object
- Franz::Tail
- Defined in:
- lib/franz/tail.rb
Overview
Tail receives low-level file events from a Watch and handles the actual reading of files, providing a stream of lines.
Constant Summary collapse
- ERR_BUFFER_FULL =
1
- ERR_INVALID_EVENT =
2
- ERR_INCOMPLETE_READ =
3
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Tail
constructor
Start a new Tail thread in the background.
-
#state ⇒ Object
Return the internal “cursors” state.
-
#stop ⇒ Hash
Stop the Tail thread.
Constructor Details
#initialize(opts = {}) ⇒ Tail
Start a new Tail thread in the background.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/franz/tail.rb', line 24 def initialize opts={} @ic = opts[:input_config] || raise('No input_config specified') @watch_events = opts[:watch_events] || [] @tail_events = opts[:tail_events] || [] @read_limit = opts[:read_limit] || 16_384 # 16 KiB @block_size = opts[:block_size] || 32_768 # 32 KiB @cursors = opts[:cursors] || Hash.new @logger = opts[:logger] || Logger.new(STDOUT) @nil_read = Hash.new { |h,k| h[k] = false } @buffer = Hash.new { |h, k| h[k] = BufferedTokenizer.new("\n") } @stop = false @statz = opts[:statz] || Franz::Stats.new @statz.create :num_reads, 0 @statz.create :num_rotates, 0 @statz.create :num_deletes, 0 @tail_thread = Thread.new do handle(watch_events.shift) until @stop end log.debug \ event: 'tail started', watch_events: watch_events, tail_events: tail_events, block_size: block_size end |
Instance Method Details
#state ⇒ Object
Return the internal “cursors” state
68 69 70 |
# File 'lib/franz/tail.rb', line 68 def state return @cursors.dup end |
#stop ⇒ Hash
Stop the Tail thread. Effectively only once.
58 59 60 61 62 63 64 65 |
# File 'lib/franz/tail.rb', line 58 def stop return state if @stop @stop = true @watch_thread.kill rescue nil @tail_thread.kill rescue nil log.debug event: 'tail stopped' return state end |