Class: Fluent::FileBuffer

Inherits:
BasicBuffer show all
Defined in:
lib/fluent/plugin/buf_file.rb,
lib/fluent/test/input_test.rb

Constant Summary collapse

PATH_MATCH =

Dots are separator for many cases:

we should have to escape dots in keys...
/^([-_.%0-9a-zA-Z]*)\.(b|q)([0-9a-fA-F]{1,32})$/
@@buffer_paths =
{}

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BasicBuffer

#clear!, #emit, #enable_parallel, #keys, #pop, #push, #queue_size, #shutdown, #storable?, #total_queued_chunk_size, #write_chunk

Methods inherited from Buffer

#clear!, #emit, #keys, #pop, #push, #shutdown

Methods included from Configurable

#config, included, lookup_type, register_type

Constructor Details

#initializeFileBuffer

Returns a new instance of FileBuffer.



85
86
87
88
89
90
# File 'lib/fluent/plugin/buf_file.rb', line 85

def initialize
  require 'uri'
  super

  @uri_parser = URI::Parser.new
end

Instance Attribute Details

‘symlink_path’ is currently only for out_file. That is the reason why this is not config_param, but attr_accessor. See: github.com/fluent/fluentd/pull/181



100
101
102
# File 'lib/fluent/plugin/buf_file.rb', line 100

def symlink_path
  @symlink_path
end

Class Method Details

.clear_buffer_pathsObject



25
26
27
# File 'lib/fluent/test/input_test.rb', line 25

def self.clear_buffer_paths
  @@buffer_paths = {}
end

Instance Method Details

#before_shutdown(out) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
# File 'lib/fluent/plugin/buf_file.rb', line 195

def before_shutdown(out)
  if @flush_at_shutdown
    synchronize do
      @map.each_key {|key|
        push(key)
      }
      while pop(out)
      end
    end
  end
end

#chunk_identifier_in_path(path) ⇒ Object



176
177
178
179
180
181
# File 'lib/fluent/plugin/buf_file.rb', line 176

def chunk_identifier_in_path(path)
  pos_after_prefix = @buffer_path_prefix.length
  pos_before_suffix = @buffer_path_suffix.length + 1 # from tail of path

  path.slice(pos_after_prefix..-pos_before_suffix)
end

#configure(conf) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fluent/plugin/buf_file.rb', line 102

def configure(conf)
  super

  if @@buffer_paths.has_key?(@buffer_path)
    raise ConfigError, "Other '#{@@buffer_paths[@buffer_path]}' plugin already use same buffer_path: type = #{conf['@type'] || conf['type']}, buffer_path = #{@buffer_path}"
  else
    @@buffer_paths[@buffer_path] = conf['@type'] || conf['type']
  end

  if pos = @buffer_path.index('*')
    @buffer_path_prefix = @buffer_path[0, pos]
    @buffer_path_suffix = @buffer_path[(pos + 1)..-1]
  else
    @buffer_path_prefix = @buffer_path + "."
    @buffer_path_suffix = ".log"
  end

end

#enqueue(chunk) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
# File 'lib/fluent/plugin/buf_file.rb', line 183

def enqueue(chunk)
  path = chunk.path
  identifier_part = chunk_identifier_in_path(path)

  m = PATH_MATCH.match(identifier_part)
  encoded_key = m ? m[1] : ""
  tsuffix = m[3]
  npath = "#{@buffer_path_prefix}#{encoded_key}.q#{tsuffix}#{@buffer_path_suffix}"

  chunk.mv(npath)
end

#new_chunk(key) ⇒ Object



130
131
132
133
134
135
# File 'lib/fluent/plugin/buf_file.rb', line 130

def new_chunk(key)
  encoded_key = encode_key(key)
  path, tsuffix = make_path(encoded_key, "b")
  unique_id = tsuffix_to_unique_id(tsuffix)
  FileBufferChunk.new(key, path, unique_id, "a+", @symlink_path)
end

#resumeObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/fluent/plugin/buf_file.rb', line 137

def resume
  maps = []
  queues = []

  Dir.glob("#{@buffer_path_prefix}*#{@buffer_path_suffix}") {|path|
    identifier_part = chunk_identifier_in_path(path)
    if m = PATH_MATCH.match(identifier_part)
      key = decode_key(m[1])
      bq = m[2]
      tsuffix = m[3]
      timestamp = m[3].to_i(16)
      unique_id = tsuffix_to_unique_id(tsuffix)

      if bq == 'b'
        chunk = FileBufferChunk.new(key, path, unique_id, "a+")
        maps << [timestamp, chunk]
      elsif bq == 'q'
        chunk = FileBufferChunk.new(key, path, unique_id, "r")
        queues << [timestamp, chunk]
      end
    end
  }

  map = {}
  maps.sort_by {|(timestamp,chunk)|
    timestamp
  }.each {|(timestamp,chunk)|
    map[chunk.key] = chunk
  }

  queue = queues.sort_by {|(timestamp,chunk)|
    timestamp
  }.map {|(timestamp,chunk)|
    chunk
  }

  return queue, map
end

#startObject



121
122
123
124
# File 'lib/fluent/plugin/buf_file.rb', line 121

def start
  FileUtils.mkdir_p File.dirname(@buffer_path_prefix + "path"), mode: DEFAULT_DIR_PERMISSION
  super
end