Class: Franz::Input

Inherits:
Object
  • Object
show all
Defined in:
lib/franz/input.rb

Overview

File input for Franz. Really, the only input for Franz, so I hope you like it.

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Input

Start a new input in the background. We’ll generate a stream of events by watching the filesystem for changes (Franz::Discover and Franz::Watch), tailing files (Franz::Tail), and generating events (Franz::Agg)

Parameters:

  • opts (Hash) (defaults to: {})

    options for the aggregator

Options Hash (opts):

  • :input (Hash) — default: {}

    “input” configuration

  • :output (Queue) — default: []

    “output” queue

  • :checkpoint (Path) — default: {}

    path to checkpoint file

  • :checkpoint_interval (Integer) — default: {}

    seconds between checkpoints

  • :logger (Logger) — default: Logger.new(STDOUT)

    logger to use



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/franz/input.rb', line 27

def initialize opts={}
  opts = {
    checkpoint: 'franz.*.checkpoint',
    checkpoint_interval: 30,
    logger: Logger.new(STDOUT),
    output: [],
    input: {
      ignore_before: 0,
      discover_bound: 10_000,
      watch_bound: 1_000,
      tail_bound: 1_000,
      discover_interval: nil,
      watch_interval: nil,
      eviction_interval: nil,
      flush_interval: nil,
      buffer_limit: nil,
      line_limit: nil,
      read_limit: nil,
      play_catchup?: nil,
      configs: []
    }
  }.deep_merge!(opts)

  @logger = opts[:logger]
  @statz = opts[:statz] || Franz::Stats.new

  @checkpoint_interval = opts[:checkpoint_interval]
  @checkpoint_path     = opts[:checkpoint].sub('*', '%d')
  @checkpoint_glob     = opts[:checkpoint]

  # The checkpoint contains a Marshalled Hash with a compact representation of
  # stateful inputs to various Franz streaming classes (e.g. the "known" option
  # to Franz::Discover). This state file is generated automatically every time
  # the input exits (see below) and also at regular intervals.
  checkpoints = Dir[@checkpoint_glob].sort_by { |path| File.mtime path }
  checkpoints = checkpoints.reject { |path| File.zero? path }
  last_checkpoint_path = checkpoints.pop
  state = nil
  unless last_checkpoint_path.nil?
    last_checkpoint = File.read(last_checkpoint_path)
    state = Marshal.load last_checkpoint
    log.info \
      event: 'input checkpoint loaded',
      checkpoint: last_checkpoint_path
  end

  state = state || {}
  known = state.keys
  stats, cursors, seqs = {}, {}, {}
  known.each do |path|
    cursor        = state[path].delete :cursor
    seq           = state[path].delete :seq
    cursors[path] = cursor unless cursor.nil?
    seqs[path]    = seq    unless seq.nil?
    stats[path]   = state[path]
  end

  discoveries  = possibly_bounded_queue opts[:input][:discover_bound]
  deletions    = possibly_bounded_queue opts[:input][:discover_bound]
  watch_events = possibly_bounded_queue opts[:input][:watch_bound]
  tail_events  = possibly_bounded_queue opts[:input][:tail_bound]

  ic = InputConfig.new opts[:input][:configs], @logger

  @disover = Franz::Discover.new \
    input_config: ic,
    discoveries: discoveries,
    deletions: deletions,
    discover_interval: opts[:input][:discover_interval],
    ignore_before: opts[:input][:ignore_before],
    logger: @logger,
    known: known,
    statz: @statz

  @watch = Franz::Watch.new \
    input_config: ic,
    discoveries: discoveries,
    deletions: deletions,
    watch_events: watch_events,
    watch_interval: opts[:input][:watch_interval],
    play_catchup?: opts[:input][:play_catchup?],
    logger: @logger,
    stats: stats,
    cursors: cursors,
    statz: @statz

  @tail = Franz::Tail.new \
    input_config: ic,
    watch_events: watch_events,
    tail_events: tail_events,
    block_size: opts[:input][:block_size],
    line_limit: opts[:input][:line_limit],
    read_limit: opts[:input][:read_limit],
    logger: @logger,
    cursors: cursors,
    statz: @statz

  @agg = Franz::Agg.new \
    input_config: ic,
    tail_events: tail_events,
    agg_events: opts[:output],
    flush_interval: opts[:input][:flush_interval],
    buffer_limit: opts[:input][:buffer_limit],
    logger: @logger,
    seqs: seqs,
    statz: @statz

  @stop = false
  @t = Thread.new do
    until @stop
      checkpoint
      sleep @checkpoint_interval
    end
  end

  log.info event: 'input started'
end

Instance Method Details

#checkpointObject

Write a checkpoint file given the current state. If a glob is provided, Franz will write a checkpoint with the current Unix timestamp, keeping the last two files for posterity



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/franz/input.rb', line 177

def checkpoint
  path = if @checkpoint_path == @checkpoint_glob
    File.open(@checkpoint_path, 'w') { |f| f.write Marshal.dump(state) }
    @checkpoint_path
  else
    old_checkpoints = Dir[@checkpoint_glob].sort_by { |p| File.mtime p }
    path = @checkpoint_path % Time.now
    File.open(path, 'w') { |f| f.write Marshal.dump(state) }
    old_checkpoints.pop # Keep last two checkpoints
    old_checkpoints.map { |c| FileUtils.rm c }
    path
  end
  log.debug \
    event: 'input checkpoint saved',
    path: path
end

#stateObject

Return a compact representation of internal state



162
163
164
165
166
167
168
169
170
171
172
# File 'lib/franz/input.rb', line 162

def state
  stats   = @watch.state
  cursors = @tail.state
  seqs    = @agg.state
  stats.keys.each do |path|
    stats[path] ||= {}
    stats[path][:cursor] = cursors.fetch(path, nil)
    stats[path][:seq] = seqs.fetch(path, nil)
  end
  return stats
end

#stopHash

Stop everything. Has the effect of draining all the Queues and waiting on auxilliarly threads (e.g. eviction) to complete full intervals, so it may ordinarily take tens of seconds, depending on your configuration.

Returns:

  • (Hash)

    compact internal state



150
151
152
153
154
155
156
157
158
159
# File 'lib/franz/input.rb', line 150

def stop
  return state if @stop
  @stop = true
  @t.join
  @watch.stop
  @tail.stop
  @agg.stop
  log.info event: 'input stopped'
  return state
end