Class: Fluent::Plugin::SwiftOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::SwiftOutput
- Defined in:
- lib/fluent/plugin/out_swift.rb
Constant Summary collapse
- DEFAULT_FORMAT_TYPE =
'out_file'
- MAX_HEX_RANDOM_LENGTH =
16
Instance Method Summary collapse
- #configure(config) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ SwiftOutput
constructor
A new instance of SwiftOutput.
- #multi_workers_ready? ⇒ Boolean
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ SwiftOutput
Returns a new instance of SwiftOutput.
61 62 63 64 |
# File 'lib/fluent/plugin/out_swift.rb', line 61 def initialize super self.uuid_flush_enabled = false end |
Instance Method Details
#configure(config) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/out_swift.rb', line 66 def configure(config) compat_parameters_convert(config, :buffer, :formatter, :inject) super if auth_url.empty? raise Fluent::ConfigError, 'auth_url parameter or OS_AUTH_URL variable not defined' end if auth_user.empty? raise Fluent::ConfigError, 'auth_user parameter or OS_USERNAME variable not defined' end if auth_api_key.empty? raise Fluent::ConfigError, 'auth_api_key parameter or OS_PASSWORD variable not defined' end if hex_random_length > MAX_HEX_RANDOM_LENGTH raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}" end unless index_format =~ /^%(0\d*)?[dxX]$/ raise Fluent::ConfigError, 'index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types' end self.ext, self.mime_type = case store_as when 'gzip' then ['gz', 'application/x-gzip'] when 'lzo' then begin Open3.capture3('lzop -V') rescue Errno::ENOENT raise ConfigError, "'lzop' utility must be in PATH for LZO compression" end ['lzo', 'application/x-lzop'] when 'json' then ['json', 'application/json'] else ['txt', 'text/plain'] end self.formatter = formatter_create self.swift_object_key_format = configure_swift_object_key_format self.swift_object_chunk_buffer = {} end |
#format(tag, time, record) ⇒ Object
133 134 135 136 |
# File 'lib/fluent/plugin/out_swift.rb', line 133 def format(tag, time, record) new_record = inject_values_to_record(tag, time, record) formatter.format(tag, time, new_record) end |
#multi_workers_ready? ⇒ Boolean
107 108 109 |
# File 'lib/fluent/plugin/out_swift.rb', line 107 def multi_workers_ready? true end |
#start ⇒ Object
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/fluent/plugin/out_swift.rb', line 111 def start Excon.defaults[:ssl_verify_peer] = ssl_verify begin self.storage = Fog::Storage.new( provider: 'OpenStack', openstack_auth_url: auth_url, openstack_username: auth_user, openstack_api_key: auth_api_key, openstack_tenant: auth_tenant, openstack_region: auth_region ) rescue StandardError => e raise "Can't call Swift API. Please check your ENV OS_*, your credentials or `auth_url` configuration. Error: #{e.inspect}" end if swift_account storage.change_account(swift_account) end check_container super end |
#write(chunk) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/fluent/plugin/out_swift.rb', line 138 def write(chunk) i = 0 = chunk. previous_path = nil begin swift_object_chunk_buffer[chunk.unique_id] ||= { '%{hex_random}' => hex_random(chunk: chunk) } values_for_swift_object_key_pre = { '%{path}' => path, '%{file_extension}' => ext } # rubocop:disable Style/FormatString values_for_swift_object_key_post = { '%{index}' => sprintf(index_format, i) }.merge!(swift_object_chunk_buffer[chunk.unique_id]) # rubocop:enable Style/FormatString if uuid_flush_enabled values_for_swift_object_key_post['%{uuid_flush}'] = uuid_random end swift_path = swift_object_key_format.gsub(/%{[^}]+}/) do |matched_key| values_for_swift_object_key_pre.fetch(matched_key, matched_key) end swift_path = extract_placeholders(swift_path, ) swift_path = swift_path.gsub(/%{[^}]+}/, values_for_swift_object_key_post) $log.info("File flushing: #{swift_path}") if i.positive? && (swift_path == previous_path) if overwrite log.warn("File: #{swift_path} already exists, but will overwrite!") break else raise "Duplicated path is generated. Use %{index} in swift_object_key_format: Path: #{swift_path}" end end i += 1 previous_path = swift_path end while check_object_exists(object: swift_path) tmp = Tempfile.new('swift-') tmp.binmode begin if store_as == 'gzip' w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.close elsif store_as == 'lzo' w = Tempfile.new('chunk-tmp') chunk.write_to(w) w.close tmp.close system "lzop -qf1 -o #{tmp.path} #{w.path}" else chunk.write_to(tmp) tmp.close end File.open(tmp.path) do |file| storage.put_object( swift_container, swift_path, file, content_type: mime_type ) swift_object_chunk_buffer.delete(chunk.unique_id) end ensure begin tmp.close(true) rescue StandardError nil end begin w.close rescue StandardError nil end begin w.unlink rescue StandardError nil end end end |