Class: Fluent::BufferizeOutput::PosKeeper
- Inherits:
-
Object
- Object
- Fluent::BufferizeOutput::PosKeeper
- Defined in:
- lib/fluent/plugin/out_bufferize.rb
Constant Summary collapse
- FILE_PERMISSION =
0644- @@instances =
{}
Class Method Summary collapse
Instance Method Summary collapse
- #each(&block) ⇒ Object
- #increment ⇒ Object
-
#initialize(chunk) ⇒ PosKeeper
constructor
A new instance of PosKeeper.
- #remove ⇒ Object
Constructor Details
#initialize(chunk) ⇒ PosKeeper
Returns a new instance of PosKeeper.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 19 def initialize(chunk) @id = chunk.unique_id @count = 0 @chunk = chunk if chunk.respond_to? :path @path = chunk.path + ".pos" mode = File::CREAT | File::RDWR perm = FILE_PERMISSION @io = File.open(@path, mode, perm) @io.sync = true line = @io.gets @count = line ? line.to_i : 0 @type = :file else @type = :mem end end |
Class Method Details
.get(chunk) ⇒ Object
10 11 12 13 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 10 def self.get(chunk) @@instances[chunk.unique_id] ||= PosKeeper.new(chunk) @@instances[chunk.unique_id] end |
.remove(chunk) ⇒ Object
15 16 17 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 15 def self.remove(chunk) @@instances.delete(chunk.unique_id) end |
Instance Method Details
#each(&block) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 38 def each(&block) @chunk.open do |io| u = MessagePack::Unpacker.new(io) begin if @count > 0 $log.debug "Bufferize: skip first #{@count} messages" @count.times do u.skip end end loop do tag, time, record = u.read yield(tag, time, record) increment end rescue EOFError end end remove end |
#increment ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 61 def increment @count += 1 if @type == :file @io.seek(0, IO::SEEK_SET) @io.puts(@count) end end |
#remove ⇒ Object
69 70 71 72 73 74 |
# File 'lib/fluent/plugin/out_bufferize.rb', line 69 def remove if @type == :file @io.close unless @io.closed? File.unlink(@path) end end |