Class: Fluent::HttpFileUploadOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_http_file_upload.rb

Defined Under Namespace

Classes: StatDummy

Constant Summary collapse

SUPPORTED_COMPRESSION_TYPES =
['gzip']

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 53

def configure(conf)
  super

  @formatter = Plugin.new_formatter(@format)
  @formatter.configure(conf)
  @client = HTTPClient.new(agent_name: @user_agent, default_header: @headers)
  # @client.debug_dev = $stderr
  if @uri.start_with?("https://")
    @client.ssl_config.verify_mode = @ssl_verify_mode
  end

  case @compress
  when 'gzip'
    raise Fluent::ConfigError, "gzip command unavailable" unless system('gzip -h > /dev/null 2>&1')
  end
end

#format(tag, time, record) ⇒ Object



70
71
72
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 70

def format(tag, time, record)
  @formatter.format(tag, time, record)
end

#upload(io, filename) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 113

def upload(io, filename)
  stat_dummy = StatDummy.new(io.size)
  io.singleton_class.class_eval{
    define_method(:path){ filename }    # override path to feed specified filename to httpclient
    define_method(:lstat){ stat_dummy } # override lstat to return chunk size only (lstat doesn't work for chunk buffer file)
  }
  postdata = { @param_name => io }
  unless @parameters.empty?
    postdata = @parameters.merge(postdata)
  end
  res = @client.post(@uri, postdata)
  if res.status == 200
    log.info "upload success with code 200" # TODO: make this `debug`
  else
    log.error "failed to upload", uri: @uri, code: res.status, content: res.content
    if res.status >= 500 && res.status < 600
      raise "failed to upload with ServerError. retrying."
    end
  end
end

#write(chunk) ⇒ Object



74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 74

def write(chunk)
  case @compress
  when 'gzip'
    write_gzip(chunk)
  else
    write_plain(chunk)
  end
end

#write_gzip(chunk) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 90

def write_gzip(chunk)
  filename = Time.now.strftime(@filename) + '.gz'
  path = if chunk.respond_to?(:path)
           chunk.path
         else
           w = Tempfile.new('chunk-gzip-temp-http_file_upload')
           chunk.write_to(w)
           w.close
           w.path
         end
  tmp = Tempfile.new('gzip-temp-http_file_upload')
  tmp.close # file will be removed after GC
  res = system "gzip -c #{path} > #{tmp.path}"
  unless res
    log.warn "failed to execute gzip command: exit code '#{$?}'"
  end
  tmp.open
  upload(tmp, filename)
  tmp.close
end

#write_plain(chunk) ⇒ Object



83
84
85
86
87
88
# File 'lib/fluent/plugin/out_http_file_upload.rb', line 83

def write_plain(chunk)
  filename = Time.now.strftime(@filename)
  chunk.open do |io|
    upload(io, filename)
  end
end