Class: Fluent::BufferizeOutput::PosKeeper

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_bufferize.rb

Constant Summary collapse

@@instances =
{}

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(chunk) ⇒ PosKeeper

Returns a new instance of PosKeeper.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/fluent/plugin/out_bufferize.rb', line 17

def initialize(chunk)
  @id = chunk.unique_id
  @count = 0
  @chunk = chunk

  if chunk.respond_to? :path
    @path = chunk.path +  ".pos"
    mode = File::CREAT | File::RDWR
    perm = DEFAULT_FILE_PERMISSION
    @io = File.open(@path, mode, perm)
    @io.sync = true
    line = @io.gets
    @count = line ? line.to_i : 0
    @type = :file
  else
    @type = :mem
  end
end

Class Method Details

.get(chunk) ⇒ Object



8
9
10
11
# File 'lib/fluent/plugin/out_bufferize.rb', line 8

def self.get(chunk)
  @@instances[chunk.unique_id] ||= PosKeeper.new(chunk)
  @@instances[chunk.unique_id]
end

.remove(chunk) ⇒ Object



13
14
15
# File 'lib/fluent/plugin/out_bufferize.rb', line 13

def self.remove(chunk)
  @@instances.delete(chunk.unique_id)
end

Instance Method Details

#each(&block) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_bufferize.rb', line 36

def each(&block)
  @chunk.open do |io|
    u = MessagePack::Unpacker.new(io)
    begin
      if @count > 0
        $log.debug "Bufferize: skip first #{@count} messages" 
        @count.times do
          u.skip
        end
      end

      loop do
        tag, time, record = u.read
        yield(tag, time, record)
        increment
      end

    rescue EOFError
    end
  end
  remove
end

#incrementObject



59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_bufferize.rb', line 59

def increment
  @count += 1
  if @type == :file
    @io.seek(0, IO::SEEK_SET)
    @io.puts(@count)
  end
end

#removeObject



67
68
69
70
71
72
# File 'lib/fluent/plugin/out_bufferize.rb', line 67

def remove
  if @type == :file
    @io.close unless @io.closed?
    File.unlink(@path)
  end
end