Class: Franz::Watch
- Inherits:
-
Object
- Object
- Franz::Watch
- Defined in:
- lib/franz/watch.rb
Overview
Watch works in tandem with Discover to maintain a list of known files and their status. Events are generated when a file is created, destroyed, or modified (including appended, truncated, and replaced).
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ Watch
constructor
Start a new Watch thread in the background.
-
#state ⇒ Object
Return the internal “stats” state.
-
#stop ⇒ Hash
Stop the Watch thread.
Constructor Details
#initialize(opts = {}) ⇒ Watch
Start a new Watch thread in the background.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/franz/watch.rb', line 25 def initialize opts={} @ic = opts[:input_config] || raise('No input_config specified') @discoveries = opts[:discoveries] || [] @deletions = opts[:deletions] || [] @watch_events = opts[:watch_events] || [] @play_catchup = opts[:play_catchup?].nil? ? true : opts[:play_catchup?] @watch_interval = opts[:watch_interval] || 10 @stats = opts[:stats] || Hash.new @logger = opts[:logger] || Logger.new(STDOUT) @statz = opts[:statz] || Franz::Stats.new @statz.create :num_watched # Make sure we're up-to-date by rewinding our old stats to our cursors if @play_catchup log.debug event: 'play catchup' stats.keys.each do |path| if @ic.type path stats[path][:size] = opts[:cursors][path] || 0 else # we shouldn't be watching this file anymore stats.delete path end end end @stop = false @thread = Thread.new do until @stop until discoveries.empty? @stats[discoveries.shift] = nil end watch.each do |deleted| @stats.delete deleted deletions.push deleted end sleep watch_interval end end log.debug \ event: 'watch started', discoveries: discoveries, deletions: deletions, watch_events: watch_events, watch_interval: watch_interval end |
Instance Method Details
#state ⇒ Object
Return the internal “stats” state
88 89 90 |
# File 'lib/franz/watch.rb', line 88 def state return @stats.dup end |
#stop ⇒ Hash
Stop the Watch thread. Effectively only once.
79 80 81 82 83 84 85 |
# File 'lib/franz/watch.rb', line 79 def stop return state if @stop @stop = true @thread.kill log.debug event: 'watch stopped' return state end |