Class: Fluent::Plugin::TailInput::GroupWatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_tail/group_watch.rb

Defined Under Namespace

Classes: FileCounter

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(rate_period = 60, limit = -1)) ⇒ GroupWatcher

Returns a new instance of GroupWatcher.



140
141
142
143
144
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 140

def initialize(rate_period = 60, limit = -1)
  @current_paths = {}
  @rate_period = rate_period
  @limit = limit
end

Instance Attribute Details

#current_pathsObject

Returns the value of attribute current_paths.



133
134
135
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 133

def current_paths
  @current_paths
end

#limitObject

Returns the value of attribute limit.



133
134
135
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 133

def limit
  @limit
end

#number_lines_readObject

Returns the value of attribute number_lines_read.



133
134
135
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 133

def number_lines_read
  @number_lines_read
end

#rate_periodObject

Returns the value of attribute rate_period.



133
134
135
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 133

def rate_period
  @rate_period
end

#start_reading_timeObject

Returns the value of attribute start_reading_time.



133
134
135
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 133

def start_reading_time
  @start_reading_time
end

Instance Method Details

#add(path) ⇒ Object



146
147
148
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 146

def add(path)
  @current_paths[path] = FileCounter.new(0, nil)
end

#delete(path) ⇒ Object



158
159
160
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 158

def delete(path)
  @current_paths.delete(path)
end

#include?(path) ⇒ Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 150

def include?(path)
  @current_paths.key?(path)
end

#limit_lines_reached?(path) ⇒ Boolean

Returns:

  • (Boolean)


183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 183

def limit_lines_reached?(path)
  return true unless include?(path)
  return true if @limit == 0

  return false if @limit < 0
  return false if @current_paths[path].number_lines_read < @limit / size

  # update_reading_time(path)
  if limit_time_period_reached?(path) # Exceeds limit
    true
  else # Does not exceed limit
    reset_counter(path)
    false
  end
end

#limit_time_period_reached?(path) ⇒ Boolean

Returns:

  • (Boolean)


179
180
181
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 179

def limit_time_period_reached?(path)
  time_spent_reading(path) < @rate_period
end

#reset_counter(path) ⇒ Object



170
171
172
173
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 170

def reset_counter(path)
  @current_paths[path].start_reading_time = nil
  @current_paths[path].number_lines_read = 0
end

#sizeObject



154
155
156
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 154

def size
  @current_paths.size
end

#time_spent_reading(path) ⇒ Object



175
176
177
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 175

def time_spent_reading(path)
  Fluent::Clock.now - @current_paths[path].start_reading_time
end

#to_sObject



199
200
201
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 199

def to_s
  super + " current_paths: #{@current_paths} rate_period: #{@rate_period} limit: #{@limit}"
end

#update_lines_read(path, value) ⇒ Object



166
167
168
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 166

def update_lines_read(path, value)
  @current_paths[path].number_lines_read += value
end

#update_reading_time(path) ⇒ Object



162
163
164
# File 'lib/fluent/plugin/in_tail/group_watch.rb', line 162

def update_reading_time(path)
  @current_paths[path].start_reading_time ||= Fluent::Clock.now
end