Class: Fluent::TailExInput

Inherits:
TailInput
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_tail_ex.rb

Defined Under Namespace

Classes: TailExWatcher

Instance Method Summary collapse

Constructor Details

#initializeTailExInput

Returns a new instance of TailExInput.



14
15
16
17
# File 'lib/fluent/plugin/in_tail_ex.rb', line 14

def initialize
  super
  @ready = false
end

Instance Method Details

#configure(conf) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/fluent/plugin/in_tail_ex.rb', line 19

def configure(conf)
  super
  if @tag.index('*')
    @tag_prefix, @tag_suffix = @tag.split('*')
    @tag_suffix ||= ''
  else
    @tag_prefix = nil
    @tag_suffix = nil
  end
  @watchers = {}
  @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, &method(:refresh_watchers))
end

#expand_pathsObject



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/fluent/plugin/in_tail_ex.rb', line 32

def expand_paths
  date = Time.now
  paths = []
  for path in @paths
    if @expand_date
      path = date.strftime(path)
    end
    paths += Dir.glob(path)
  end
  paths
end

#receive_lines(lines, tag) ⇒ Object



80
81
82
83
84
85
# File 'lib/fluent/plugin/in_tail_ex.rb', line 80

def receive_lines(lines, tag)
  if @tag_prefix || @tag_suffix
    @tag = @tag_prefix + tag + @tag_suffix
  end
  super(lines)
end

#refresh_watchersObject



44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/in_tail_ex.rb', line 44

def refresh_watchers
  paths = expand_paths
  missing = @watchers.keys - paths
  added = paths - @watchers.keys

  stop_watch(missing) unless missing.empty?
  start_watch(added) unless added.empty?
end

#runObject



106
107
108
109
110
111
# File 'lib/fluent/plugin/in_tail_ex.rb', line 106

def run
  # don't run unless ready to avoid coolio error
  if @ready
    super
  end
end

#shutdownObject



98
99
100
101
102
103
104
# File 'lib/fluent/plugin/in_tail_ex.rb', line 98

def shutdown
  @refresh_trigger.detach
  stop_watch(@watchers.keys, true)
  @loop.stop
  @thread.join
  @pf_file.close if @pf_file
end

#startObject



87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/in_tail_ex.rb', line 87

def start
  paths, @paths = @paths, []
  super
  @thread.join
  @paths = paths
  refresh_watchers
  @refresh_trigger.attach(@loop)
  @ready = true
  @thread = Thread.new(&method(:run))
end

#start_watch(paths) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/in_tail_ex.rb', line 53

def start_watch(paths)
  paths.each do |path|
    if @pf
      pe = @pf[path]
      if @read_all && pe.read_inode == 0
        inode = File::Stat.new(path).ino
        pe.update(inode, 0)
      end
    else
      pe = nil
    end

    watcher = TailExWatcher.new(path, @rotate_wait, pe, &method(:receive_lines))
    watcher.attach(@loop)
    @watchers[path] = watcher
  end
end

#stop_watch(paths, immediate = false) ⇒ Object



71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/in_tail_ex.rb', line 71

def stop_watch(paths, immediate=false)
  paths.each do |path|
    watcher = @watchers.delete(path)
    if watcher
      watcher.close(immediate ? nil : @loop)
    end
  end
end