Class: Fluent::BufferizeOutput::PosKeeper
- Inherits:
-
Object
- Object
- Fluent::BufferizeOutput::PosKeeper
- Defined in:
- lib/fluent/plugin/out_bufferize.rb
Constant Summary collapse
- @@instances =
{}
Class Method Summary collapse
Instance Method Summary collapse
- #each(&block) ⇒ Object
- #increment ⇒ Object
-
#initialize(chunk) ⇒ PosKeeper
constructor
A new instance of PosKeeper.
- #remove ⇒ Object
Constructor Details
#initialize(chunk) ⇒ PosKeeper
Returns a new instance of PosKeeper.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 17 def initialize(chunk) @id = chunk.unique_id @count = 0 @chunk = chunk if chunk.respond_to? :path @path = chunk.path + ".pos" mode = File::CREAT | File::RDWR perm = DEFAULT_FILE_PERMISSION @io = File.open(@path, mode, perm) @io.sync = true line = @io.gets @count = line ? line.to_i : 0 @type = :file else @type = :mem end end |
Class Method Details
.get(chunk) ⇒ Object
8 9 10 11 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 8 def self.get(chunk) @@instances[chunk.unique_id] ||= PosKeeper.new(chunk) @@instances[chunk.unique_id] end |
.remove(chunk) ⇒ Object
13 14 15 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 13 def self.remove(chunk) @@instances.delete(chunk.unique_id) end |
Instance Method Details
#each(&block) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 36 def each(&block) @chunk.open do |io| u = MessagePack::Unpacker.new(io) begin if @count > 0 $log.debug "Bufferize: skip first #{@count} messages" @count.times do u.skip end end loop do tag, time, record = u.read yield(tag, time, record) increment end rescue EOFError end end remove end |
#increment ⇒ Object
59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 59 def increment @count += 1 if @type == :file @io.seek(0, IO::SEEK_SET) @io.puts(@count) end end |
#remove ⇒ Object
67 68 69 70 71 72 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 67 def remove if @type == :file @io.close unless @io.closed? File.unlink(@path) end end |