Class: Fluent::HoopOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::HoopOutput
- Includes:
- FluentExt::PlainTextFormatterMixin
- Defined in:
- lib/fluent/plugin/out_hoop.rb
Instance Attribute Summary
Attributes included from FluentExt::PlainTextFormatterMixin
#add_newline, #default_tag, #field_separator, #output_data_type, #output_include_tag, #output_include_time, #remove_prefix
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ HoopOutput
constructor
A new instance of HoopOutput.
- #path_format(chunk_key) ⇒ Object
- #record_to_string(record) ⇒ Object
- #send_data(path, data, retries = 0) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Methods included from FluentExt::PlainTextFormatterMixin
Constructor Details
#initialize ⇒ HoopOutput
Returns a new instance of HoopOutput.
223 224 225 226 227 |
# File 'lib/fluent/plugin/out_hoop.rb', line 223 def initialize super require 'net/http' require 'time' end |
Instance Method Details
#configure(conf) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/fluent/plugin/out_hoop.rb', line 229 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super unless /\A([a-zA-Z0-9][-a-zA-Z0-9.]*):(\d+)\Z/ =~ @hoop_server raise Fluent::ConfigError, "Invalid config value on hoop_server: '#{@hoop_server}', needs SERVER_NAME:PORT" end @host = $1 @port = $2.to_i unless @path.index('/') == 0 raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'" end @conn = nil @header = {'Content-Type' => 'application/octet-stream'} @f_separator = case @field_separator when 'SPACE' then ' ' when 'COMMA' then ',' else "\t" end end |
#format(tag, time, record) ⇒ Object
289 290 291 292 |
# File 'lib/fluent/plugin/out_hoop.rb', line 289 def format(tag, time, record) time_str = @timef.format(time) time_str + @f_separator + tag + @f_separator + record_to_string(record) + @line_end end |
#path_format(chunk_key) ⇒ Object
294 295 296 |
# File 'lib/fluent/plugin/out_hoop.rb', line 294 def path_format(chunk_key) Time.strptime(chunk_key, @time_slice_format).strftime(@path) end |
#record_to_string(record) ⇒ Object
285 286 287 |
# File 'lib/fluent/plugin/out_hoop.rb', line 285 def record_to_string(record) record.to_json end |
#send_data(path, data, retries = 0) ⇒ Object
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/fluent/plugin/out_hoop.rb', line 298 def send_data(path, data, retries=0) conn = Net::HTTP.start(@host, @port) conn.read_timeout = 5 res = conn.request_put(path + "?op=append", data, ) if res.code == '401' res = conn.request_get("/?op=status&user.name=#{@username}") if res.code.to_i < 300 and res['Set-Cookie'] = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'} else $log.error "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}" raise Fluent::ConfigError, "Failed to update authorized cookie, code: #{res.code}, message: #{res.body}" end res = conn.request_put(path + "?op=append", data, ) end if res.code == '404' res = conn.request_post(path + "?op=create&overwrite=false", data, ) end if res.code == '500' if retries >= 3 raise StandardError, "failed to send_data with retry 3 times InternalServerError" end sleep 0.3 # yes, this is a magic number res = send_data(path, data, retries + 1) end conn.finish if res.code != '200' and res.code != '201' $log.warn "failed to write data to path: #{path}, code: #{res.code} #{res.message}" end res end |
#shutdown ⇒ Object
281 282 283 |
# File 'lib/fluent/plugin/out_hoop.rb', line 281 def shutdown super end |
#start ⇒ Object
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/fluent/plugin/out_hoop.rb', line 260 def start super # okey, net/http has reconnect feature. see test_out_hoop_reconnect.rb conn = Net::HTTP.start(@host, @port) begin res = conn.request_get("/?op=status&user.name=#{@username}") if res.code.to_i < 300 and res['Set-Cookie'] = {'Cookie' => res['Set-Cookie'].split(';')[0], 'Content-Type' => 'application/octet-stream'} else $log.error "initalize request failed, code: #{res.code}, message: #{res.body}" raise Fluent::ConfigError, "initalize request failed, code: #{res.code}, message: #{res.body}" end rescue $log.error "failed to connect hoop server: #{@host} port #{@port}" raise end conn.finish $log.info "connected hoop server: #{@host} port #{@port}" end |
#write(chunk) ⇒ Object
329 330 331 332 333 334 335 336 337 338 |
# File 'lib/fluent/plugin/out_hoop.rb', line 329 def write(chunk) hdfs_path = path_format(chunk.key) begin send_data(hdfs_path, chunk.read) rescue $log.error "failed to communicate server, #{@host} port #{@port}, path: #{hdfs_path}" raise end hdfs_path end |