Class: Franz::Input
- Inherits:
-
Object
- Object
- Franz::Input
- Defined in:
- lib/franz/input.rb
Overview
File input for Franz. Really, the only input for Franz, so I hope you like it.
Instance Method Summary collapse
-
#checkpoint ⇒ Object
Write a checkpoint file given the current state.
-
#initialize(opts = {}) ⇒ Input
constructor
Start a new input in the background.
-
#state ⇒ Object
Return a compact representation of internal state.
-
#stop ⇒ Hash
Stop everything.
Constructor Details
#initialize(opts = {}) ⇒ Input
Start a new input in the background. We’ll generate a stream of events by watching the filesystem for changes (Franz::Discover and Franz::Watch), tailing files (Franz::Tail), and generating events (Franz::Agg)
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/franz/input.rb', line 27 def initialize opts={} opts = { checkpoint: 'franz.*.checkpoint', checkpoint_interval: 30, logger: Logger.new(STDOUT), output: [], input: { ignore_before: 0, discover_bound: 10_000, watch_bound: 1_000, tail_bound: 1_000, discover_interval: nil, watch_interval: nil, eviction_interval: nil, flush_interval: nil, buffer_limit: nil, line_limit: nil, read_limit: nil, play_catchup?: nil, configs: [] } }.deep_merge!(opts) @logger = opts[:logger] @statz = opts[:statz] || Franz::Stats.new @checkpoint_interval = opts[:checkpoint_interval] @checkpoint_path = opts[:checkpoint].sub('*', '%d') @checkpoint_glob = opts[:checkpoint] # The checkpoint contains a Marshalled Hash with a compact representation of # stateful inputs to various Franz streaming classes (e.g. the "known" option # to Franz::Discover). This state file is generated automatically every time # the input exits (see below) and also at regular intervals. checkpoints = Dir[@checkpoint_glob].sort_by { |path| File.mtime path } checkpoints = checkpoints.reject { |path| File.zero? path } last_checkpoint_path = checkpoints.pop state = nil unless last_checkpoint_path.nil? last_checkpoint = File.read(last_checkpoint_path) state = Marshal.load last_checkpoint log.info \ event: 'input checkpoint loaded', checkpoint: last_checkpoint_path end state = state || {} known = state.keys stats, cursors, seqs = {}, {}, {} known.each do |path| cursor = state[path].delete :cursor seq = state[path].delete :seq cursors[path] = cursor unless cursor.nil? seqs[path] = seq unless seq.nil? stats[path] = state[path] end discoveries = possibly_bounded_queue opts[:input][:discover_bound] deletions = possibly_bounded_queue opts[:input][:discover_bound] watch_events = possibly_bounded_queue opts[:input][:watch_bound] tail_events = possibly_bounded_queue opts[:input][:tail_bound] ic = InputConfig.new opts[:input][:configs], @logger @disover = Franz::Discover.new \ input_config: ic, discoveries: discoveries, deletions: deletions, discover_interval: opts[:input][:discover_interval], ignore_before: opts[:input][:ignore_before], logger: @logger, known: known, statz: @statz @watch = Franz::Watch.new \ input_config: ic, discoveries: discoveries, deletions: deletions, watch_events: watch_events, watch_interval: opts[:input][:watch_interval], play_catchup?: opts[:input][:play_catchup?], logger: @logger, stats: stats, cursors: cursors, statz: @statz @tail = Franz::Tail.new \ input_config: ic, watch_events: watch_events, tail_events: tail_events, block_size: opts[:input][:block_size], line_limit: opts[:input][:line_limit], read_limit: opts[:input][:read_limit], logger: @logger, cursors: cursors, statz: @statz @agg = Franz::Agg.new \ input_config: ic, tail_events: tail_events, agg_events: opts[:output], flush_interval: opts[:input][:flush_interval], buffer_limit: opts[:input][:buffer_limit], logger: @logger, seqs: seqs, statz: @statz @stop = false @t = Thread.new do until @stop checkpoint sleep @checkpoint_interval end end log.info event: 'input started' end |
Instance Method Details
#checkpoint ⇒ Object
Write a checkpoint file given the current state. If a glob is provided, Franz will write a checkpoint with the current Unix timestamp, keeping the last two files for posterity
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/franz/input.rb', line 177 def checkpoint path = if @checkpoint_path == @checkpoint_glob File.open(@checkpoint_path, 'w') { |f| f.write Marshal.dump(state) } @checkpoint_path else old_checkpoints = Dir[@checkpoint_glob].sort_by { |p| File.mtime p } path = @checkpoint_path % Time.now File.open(path, 'w') { |f| f.write Marshal.dump(state) } old_checkpoints.pop # Keep last two checkpoints old_checkpoints.map { |c| FileUtils.rm c } path end log.debug \ event: 'input checkpoint saved', path: path end |
#state ⇒ Object
Return a compact representation of internal state
162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/franz/input.rb', line 162 def state stats = @watch.state cursors = @tail.state seqs = @agg.state stats.keys.each do |path| stats[path] ||= {} stats[path][:cursor] = cursors.fetch(path, nil) stats[path][:seq] = seqs.fetch(path, nil) end return stats end |
#stop ⇒ Hash
Stop everything. Has the effect of draining all the Queues and waiting on auxilliarly threads (e.g. eviction) to complete full intervals, so it may ordinarily take tens of seconds, depending on your configuration.
150 151 152 153 154 155 156 157 158 159 |
# File 'lib/franz/input.rb', line 150 def stop return state if @stop @stop = true @t.join @watch.stop @tail.stop @agg.stop log.info event: 'input stopped' return state end |