Module: Fluent::PluginHelper::Storage
- Includes:
- Timer
- Defined in:
- lib/fluent/plugin_helper/storage.rb
Defined Under Namespace
Modules: StorageParams
Classes: PersistentWrapper, StorageState, SynchronizeWrapper
Constant Summary
Constants included
from EventLoop
EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT
Constants included
from Thread
Thread::THREAD_DEFAULT_WAIT_SECONDS
Instance Attribute Summary collapse
Attributes included from Timer
#_timers
Attributes included from EventLoop
#_event_loop
Attributes included from Thread
#_threads
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Timer
#timer_execute, #timer_running?
Methods included from EventLoop
#event_loop_attach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop
Methods included from Thread
#thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop
Instance Attribute Details
#_storages ⇒ Object
85
86
87
|
# File 'lib/fluent/plugin_helper/storage.rb', line 85
def _storages
@_storages
end
|
Class Method Details
.included(mod) ⇒ Object
81
82
83
|
# File 'lib/fluent/plugin_helper/storage.rb', line 81
def self.included(mod)
mod.include StorageParams
end
|
Instance Method Details
#after_shutdown ⇒ Object
156
157
158
159
|
# File 'lib/fluent/plugin_helper/storage.rb', line 156
def after_shutdown
storage_operate(:after_shutdown)
super
end
|
#before_shutdown ⇒ Object
144
145
146
147
|
# File 'lib/fluent/plugin_helper/storage.rb', line 144
def before_shutdown
storage_operate(:before_shutdown)
super
end
|
#close ⇒ Object
161
162
163
164
|
# File 'lib/fluent/plugin_helper/storage.rb', line 161
def close
storage_operate(:close){|s| s.running = false }
super
end
|
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/fluent/plugin_helper/storage.rb', line 93
def configure(conf)
super
@storage_configs.each do |section|
if @_storages[section.usage]
raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}"
end
storage = Plugin.new_storage(section[:@type], parent: self)
storage.configure(section.corresponding_config_element)
@_storages[section.usage] = StorageState.new(wrap_instance(storage), false)
end
end
|
#initialize ⇒ Object
87
88
89
90
91
|
# File 'lib/fluent/plugin_helper/storage.rb', line 87
def initialize
super
@_storages_started = false
@_storages = {}
end
|
#shutdown ⇒ Object
149
150
151
152
153
154
|
# File 'lib/fluent/plugin_helper/storage.rb', line 149
def shutdown
storage_operate(:shutdown) do |s|
s.storage.save if s.storage.save_at_shutdown
end
super
end
|
#start ⇒ Object
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/fluent/plugin_helper/storage.rb', line 106
def start
super
@_storages_started = true
@_storages.each_pair do |usage, s|
s.storage.start
s.storage.load
if s.storage.autosave && !s.storage.persistent
timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do
begin
s.storage.save
rescue => e
log.error "plugin storage failed to save its data", usage: usage, type: type, error: e
end
end
end
s.running = true
end
end
|
#stop ⇒ Object
138
139
140
141
142
|
# File 'lib/fluent/plugin_helper/storage.rb', line 138
def stop
super
storage_operate(:stop)
end
|
#storage_create(usage: '', type: nil, conf: nil, default_type: nil) ⇒ Object
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# File 'lib/fluent/plugin_helper/storage.rb', line 32
def storage_create(usage: '', type: nil, conf: nil, default_type: nil)
s = @_storages[usage]
if s && s.running
return s.storage
elsif s
else
type = if type
type
elsif conf && conf.respond_to?(:[])
raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type']
conf['@type']
elsif default_type
default_type
else
raise ArgumentError, "BUG: both type and conf are not specified"
end
storage = Plugin.new_storage(type, parent: self)
config = case conf
when Fluent::Config::Element
conf
when Hash
conf = Hash[conf.map{|k,v| [k.to_s, v]}]
Fluent::Config::Element.new('storage', usage, conf, [])
when nil
Fluent::Config::Element.new('storage', usage, {}, [])
else
raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'"
end
storage.configure(config)
if @_storages_started
storage.start
end
s = @_storages[usage] = StorageState.new(wrap_instance(storage), false)
end
s.storage
end
|
#storage_operate(method_name, &block) ⇒ Object
127
128
129
130
131
132
133
134
135
136
|
# File 'lib/fluent/plugin_helper/storage.rb', line 127
def storage_operate(method_name, &block)
@_storages.each_pair do |usage, s|
begin
block.call(s) if block_given?
s.storage.send(method_name)
rescue => e
log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e
end
end
end
|
#terminate ⇒ Object
166
167
168
169
170
|
# File 'lib/fluent/plugin_helper/storage.rb', line 166
def terminate
storage_operate(:terminate)
@_storages = {}
super
end
|
#wrap_instance(storage) ⇒ Object
172
173
174
175
176
177
178
179
180
181
182
|
# File 'lib/fluent/plugin_helper/storage.rb', line 172
def wrap_instance(storage)
if storage.persistent && storage.persistent_always?
storage
elsif storage.persistent
PersistentWrapper.new(storage)
elsif !storage.synchronized?
SynchronizeWrapper.new(storage)
else
storage
end
end
|