Class: Fluent::Plugin::CloudwatchIngestInput::State

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_cloudwatch_ingest.rb

Defined Under Namespace

Classes: LockFailed

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(filepath, log) ⇒ State

Returns a new instance of State.



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 254

def initialize(filepath, log)
  @filepath = filepath
  @log = log
  @store = Hash.new { |h, k| h[k] = {} }

  if File.exist?(filepath)
    self.statefile = Pathname.new(@filepath).open('r+')
  else
    @log.warn("No state file #{statefile} Creating a new one.")
    begin
      self.statefile = Pathname.new(@filepath).open('w+')
      save
    rescue => boom
      @log.error("Unable to create new file #{statefile.path}: #{boom}")
    end
  end

  # Attempt to obtain an exclusive flock on the file and raise and
  # exception if we can't
  @log.info("Obtaining exclusive lock on state file #{statefile.path}")
  lockstatus = statefile.flock(File::LOCK_EX | File::LOCK_NB)
  raise CloudwatchIngestInput::State::LockFailed if lockstatus == false

  @store.merge!(Psych.safe_load(statefile.read))
  @log.info("Loaded #{@store.keys.size} groups from #{statefile.path}")
end

Instance Attribute Details

#statefileObject

Returns the value of attribute statefile.



252
253
254
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 252

def statefile
  @statefile
end

#storeObject

Returns the value of attribute store.



252
253
254
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 252

def store
  @store
end

Instance Method Details

#closeObject



288
289
290
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 288

def close
  statefile.close
end

#prune(log_groups) ⇒ Object



292
293
294
295
296
297
298
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 292

def prune(log_groups)
  groups_before = @store.keys.size
  @store.delete_if { |k, _v| true unless log_groups.include?(k) }
  @log.info("Pruned #{groups_before - @store.keys.size} keys from store")

  # TODO: also prune streams as these are most likely to be transient
end

#saveObject



281
282
283
284
285
286
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 281

def save
  statefile.rewind
  statefile.write(Psych.dump(@store))
  @log.info("Saved state to #{statefile.path}")
  statefile.rewind
end