Class: Fluent::Plugin::CloudwatchIngestInput::State
- Inherits:
-
Object
- Object
- Fluent::Plugin::CloudwatchIngestInput::State
- Defined in:
- lib/fluent/plugin/in_cloudwatch_ingest.rb
Defined Under Namespace
Classes: LockFailed
Instance Attribute Summary collapse
-
#statefile ⇒ Object
Returns the value of attribute statefile.
-
#store ⇒ Object
Returns the value of attribute store.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(filepath, log) ⇒ State
constructor
A new instance of State.
- #prune(log_groups) ⇒ Object
- #save ⇒ Object
Constructor Details
#initialize(filepath, log) ⇒ State
Returns a new instance of State.
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 254 def initialize(filepath, log) @filepath = filepath @log = log @store = Hash.new { |h, k| h[k] = {} } if File.exist?(filepath) self.statefile = Pathname.new(@filepath).open('r+') else @log.warn("No state file #{statefile} Creating a new one.") begin self.statefile = Pathname.new(@filepath).open('w+') save rescue => boom @log.error("Unable to create new file #{statefile.path}: #{boom}") end end # Attempt to obtain an exclusive flock on the file and raise and # exception if we can't @log.info("Obtaining exclusive lock on state file #{statefile.path}") lockstatus = statefile.flock(File::LOCK_EX | File::LOCK_NB) raise CloudwatchIngestInput::State::LockFailed if lockstatus == false @store.merge!(Psych.safe_load(statefile.read)) @log.info("Loaded #{@store.keys.size} groups from #{statefile.path}") end |
Instance Attribute Details
#statefile ⇒ Object
Returns the value of attribute statefile.
252 253 254 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 252 def statefile @statefile end |
#store ⇒ Object
Returns the value of attribute store.
252 253 254 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 252 def store @store end |
Instance Method Details
#close ⇒ Object
288 289 290 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 288 def close statefile.close end |
#prune(log_groups) ⇒ Object
292 293 294 295 296 297 298 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 292 def prune(log_groups) groups_before = @store.keys.size @store.delete_if { |k, _v| true unless log_groups.include?(k) } @log.info("Pruned #{groups_before - @store.keys.size} keys from store") # TODO: also prune streams as these are most likely to be transient end |
#save ⇒ Object
281 282 283 284 285 286 |
# File 'lib/fluent/plugin/in_cloudwatch_ingest.rb', line 281 def save statefile.rewind statefile.write(Psych.dump(@store)) @log.info("Saved state to #{statefile.path}") statefile.rewind end |