Class: Fluent::HttpFileUploadOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::HttpFileUploadOutput
- Includes:
- SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/out_http_file_upload.rb
Defined Under Namespace
Classes: StatDummy
Constant Summary collapse
- SUPPORTED_COMPRESSION_TYPES =
['gzip']
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #upload(io, filename) ⇒ Object
- #write(chunk) ⇒ Object
- #write_gzip(chunk) ⇒ Object
- #write_plain(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 53 def configure(conf) super @formatter = Plugin.new_formatter(@format) @formatter.configure(conf) @client = HTTPClient.new(agent_name: @user_agent, default_header: @headers) # @client.debug_dev = $stderr if @uri.start_with?("https://") @client.ssl_config.verify_mode = @ssl_verify_mode end case @compress when 'gzip' raise Fluent::ConfigError, "gzip command unavailable" unless system('gzip -h > /dev/null 2>&1') end end |
#format(tag, time, record) ⇒ Object
70 71 72 |
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 70 def format(tag, time, record) @formatter.format(tag, time, record) end |
#upload(io, filename) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 113 def upload(io, filename) stat_dummy = StatDummy.new(io.size) io.singleton_class.class_eval{ define_method(:path){ filename } # override path to feed specified filename to httpclient define_method(:lstat){ stat_dummy } # override lstat to return chunk size only (lstat doesn't work for chunk buffer file) } postdata = { @param_name => io } unless @parameters.empty? postdata = @parameters.merge(postdata) end res = @client.post(@uri, postdata) if res.status == 200 log.info "upload success with code 200" # TODO: make this `debug` else log.error "failed to upload", uri: @uri, code: res.status, content: res.content if res.status >= 500 && res.status < 600 raise "failed to upload with ServerError. retrying." end end end |
#write(chunk) ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 74 def write(chunk) case @compress when 'gzip' write_gzip(chunk) else write_plain(chunk) end end |
#write_gzip(chunk) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 90 def write_gzip(chunk) filename = Time.now.strftime(@filename) + '.gz' path = if chunk.respond_to?(:path) chunk.path else w = Tempfile.new('chunk-gzip-temp-http_file_upload') chunk.write_to(w) w.close w.path end tmp = Tempfile.new('gzip-temp-http_file_upload') tmp.close # file will be removed after GC res = system "gzip -c #{path} > #{tmp.path}" unless res log.warn "failed to execute gzip command: exit code '#{$?}'" end tmp.open upload(tmp, filename) tmp.close end |
#write_plain(chunk) ⇒ Object
83 84 85 86 87 88 |
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 83 def write_plain(chunk) filename = Time.now.strftime(@filename) chunk.open do |io| upload(io, filename) end end |