Class: Fluent::Plugin::LevelDBStorage

Inherits:
Storage
  • Object
show all
Defined in:
lib/fluent/plugin/storage_leveldb.rb

Constant Summary collapse

DEFAULT_DIR_MODE =
0755

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeLevelDBStorage

Returns a new instance of LevelDBStorage.



21
22
23
24
25
# File 'lib/fluent/plugin/storage_leveldb.rb', line 21

def initialize
  super

  @store = {}
end

Instance Attribute Details

#leveldbObject (readonly)

for test



19
20
21
# File 'lib/fluent/plugin/storage_leveldb.rb', line 19

def leveldb
  @leveldb
end

#storeObject (readonly)

for test



19
20
21
# File 'lib/fluent/plugin/storage_leveldb.rb', line 19

def store
  @store
end

Instance Method Details

#configure(conf) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/storage_leveldb.rb', line 27

def configure(conf)
  super

  if @path
    if File.exist?(@path) && File.file?(@path)
      raise Fluent::ConfigError, "path for <storage> should be a directory."
    elsif File.exist?(@path) && File.directory?(@path)
      @path = File.join(@path, "worker#{fluentd_worker_id}", "storage.db")
      @multi_workers_available = true
    end
  elsif root_dir = owner.plugin_root_dir
    basename = (conf.arg && !conf.arg.empty?) ? "storage.#{conf.arg}.db" : "storage.db"
    @path = File.join(root_dir, basename)
    @multi_workers_available = true
  else
    raise Fluent::ConfigError, "path for <storage> is required."
  end

  dir = File.dirname(@path)
  FileUtils.mkdir_p(dir, mode: @dir_mode) unless Dir.exist?(dir)
  if File.exist?(@path)
    raise Fluent::ConfigError, "Plugin storage path '#{@path}' is not readable/writable" unless File.readable?(@path) && File.writable?(@path)
  else
    raise Fluent::ConfigError, "Directory is not writable for plugin storage file '#{@path}'" unless File.stat(dir).writable?
  end

  @leveldb = LevelDB::DB.new(@path)
  object = @leveldb.get(@root_key)
  if object
    begin
      data = Yajl::Parser.parse(object)
      raise Fluent::ConfigError, "Invalid contents (not object) in plugin leveldb storage: '#{@path}'" unless data.is_a?(Hash)
    rescue => e
      log.error "failed to read data from plugin redis storage", path: @path, error: e
      raise Fluent::ConfigError, "Unexpected error: failed to read data from plugin leveldb storage: '#{@path}'"
    end
  end
end

#delete(key) ⇒ Object



108
109
110
# File 'lib/fluent/plugin/storage_leveldb.rb', line 108

def delete(key)
  @store.delete(key.to_s)
end

#fetch(key, defval) ⇒ Object



100
101
102
# File 'lib/fluent/plugin/storage_leveldb.rb', line 100

def fetch(key, defval)
  @store.fetch(key.to_s, defval)
end

#get(key) ⇒ Object



96
97
98
# File 'lib/fluent/plugin/storage_leveldb.rb', line 96

def get(key)
  @store[key.to_s]
end

#loadObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/storage_leveldb.rb', line 70

def load
  begin
    json_string = @leveldb.get(@root_key)
    json = Yajl::Parser.parse(json_string)
    unless json.is_a?(Hash)
      log.error "broken content for plugin storage (Hash required: ignored)", type: json.class
      log.debug "broken content", content: json_string
      return
    end
    @store = json
  rescue => e
    log.error "failed to load data for plugin storage from leveldb", path: @path, error: e
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/fluent/plugin/storage_leveldb.rb', line 66

def multi_workers_ready?
  @multi_workers_available
end

#put(key, value) ⇒ Object



104
105
106
# File 'lib/fluent/plugin/storage_leveldb.rb', line 104

def put(key, value)
  @store[key.to_s] = value
end

#saveObject



85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/storage_leveldb.rb', line 85

def save
  begin
    json_string = Yajl::Encoder.encode(@store)
    @leveldb.batch do
      @leveldb.put(@root_key, json_string)
    end
  rescue => e
    log.error "failed to save data for plugin storage to leveldb", path: @path, error: e
  end
end

#update(key, &block) ⇒ Object



112
113
114
# File 'lib/fluent/plugin/storage_leveldb.rb', line 112

def update(key, &block)
  @store[key.to_s] = block.call(@store[key.to_s])
end