Module: Fluent::PluginHelper::Storage

Includes:
Timer
Defined in:
lib/fluent/plugin_helper/storage.rb

Defined Under Namespace

Modules: StorageParams Classes: PersistentWrapper, StorageState, SynchronizeWrapper

Constant Summary

Constants included from EventLoop

EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS

Instance Attribute Summary collapse

Attributes included from Timer

#_timers

Attributes included from EventLoop

#_event_loop

Attributes included from Thread

#_threads

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Timer

#timer_execute, #timer_running?

Methods included from EventLoop

#event_loop_attach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop

Methods included from Thread

#thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_storagesObject (readonly)

for tests



85
86
87
# File 'lib/fluent/plugin_helper/storage.rb', line 85

def _storages
  @_storages
end

Class Method Details

.included(mod) ⇒ Object



81
82
83
# File 'lib/fluent/plugin_helper/storage.rb', line 81

def self.included(mod)
  mod.include StorageParams
end

Instance Method Details

#after_shutdownObject



156
157
158
159
# File 'lib/fluent/plugin_helper/storage.rb', line 156

def after_shutdown
  storage_operate(:after_shutdown)
  super
end

#before_shutdownObject



144
145
146
147
# File 'lib/fluent/plugin_helper/storage.rb', line 144

def before_shutdown
  storage_operate(:before_shutdown)
  super
end

#closeObject



161
162
163
164
# File 'lib/fluent/plugin_helper/storage.rb', line 161

def close
  storage_operate(:close){|s| s.running = false }
  super
end

#configure(conf) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin_helper/storage.rb', line 93

def configure(conf)
  super

  @storage_configs.each do |section|
    if @_storages[section.usage]
      raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}"
    end
    storage = Plugin.new_storage(section[:@type], parent: self)
    storage.configure(section.corresponding_config_element)
    @_storages[section.usage] = StorageState.new(wrap_instance(storage), false)
  end
end

#initializeObject



87
88
89
90
91
# File 'lib/fluent/plugin_helper/storage.rb', line 87

def initialize
  super
  @_storages_started = false
  @_storages = {} # usage => storage_state
end

#shutdownObject



149
150
151
152
153
154
# File 'lib/fluent/plugin_helper/storage.rb', line 149

def shutdown
  storage_operate(:shutdown) do |s|
    s.storage.save if s.storage.save_at_shutdown
  end
  super
end

#startObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/plugin_helper/storage.rb', line 106

def start
  super

  @_storages_started = true
  @_storages.each_pair do |usage, s|
    s.storage.start
    s.storage.load

    if s.storage.autosave && !s.storage.persistent
      timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do
        begin
          s.storage.save
        rescue => e
          log.error "plugin storage failed to save its data", usage: usage, type: type, error: e
        end
      end
    end
    s.running = true
  end
end

#stopObject



138
139
140
141
142
# File 'lib/fluent/plugin_helper/storage.rb', line 138

def stop
  super
  # timer stops automatically in super
  storage_operate(:stop)
end

#storage_create(usage: '', type: nil, conf: nil, default_type: nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin_helper/storage.rb', line 32

def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
  s = @_storages[usage]
  if s && s.running
    return s.storage
  elsif s
    # storage is already created, but not loaded / started
  else # !s
    type = if type
             type
           elsif conf && conf.respond_to?(:[])
             raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type']
             conf['@type']
           elsif default_type
             default_type
           else
             raise ArgumentError, "BUG: both type and conf are not specified"
           end
    storage = Plugin.new_storage(type, parent: self)
    config = case conf
             when Fluent::Config::Element
               conf
             when Hash
               # in code, programmer may use symbols as keys, but Element needs strings
               conf = Hash[conf.map{|k,v| [k.to_s, v]}]
               Fluent::Config::Element.new('storage', usage, conf, [])
             when nil
               Fluent::Config::Element.new('storage', usage, {}, [])
             else
               raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
             end
    storage.configure(config)
    if @_storages_started
      storage.start
    end
    s = @_storages[usage] = StorageState.new(wrap_instance(storage), false)
  end

  s.storage
end

#storage_operate(method_name, &block) ⇒ Object



127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin_helper/storage.rb', line 127

def storage_operate(method_name, &block)
  @_storages.each_pair do |usage, s|
    begin
      block.call(s) if block_given?
      s.storage.send(method_name)
    rescue => e
      log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e
    end
  end
end

#terminateObject



166
167
168
169
170
# File 'lib/fluent/plugin_helper/storage.rb', line 166

def terminate
  storage_operate(:terminate)
  @_storages = {}
  super
end

#wrap_instance(storage) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
# File 'lib/fluent/plugin_helper/storage.rb', line 172

def wrap_instance(storage)
  if storage.persistent && storage.persistent_always?
    storage
  elsif storage.persistent
    PersistentWrapper.new(storage)
  elsif !storage.synchronized?
    SynchronizeWrapper.new(storage)
  else
    storage
  end
end