Class: Fluent::Plugin::GrowthForecastOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_growthforecast.rb

Constant Summary collapse

DEFAULT_GRAPH_PATH =
{
  ignore: '${service}/${section}/${key_name}',
  service: '${tag}/${section}/${key_name}',
  section: '${service}/${tag}/${key_name}',
  name_prefix: '${service}/${section}/${tag}_${key_name}',
}

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/fluent/plugin/out_growthforecast.rb', line 91

def configure(conf)
  super

  unless @chunk_key_tag
    raise Fluent::ConfigError, "configure buffer chunk_keys with tag"
  end

  if @gfapi_url !~ /\/api\/\Z/
    raise Fluent::ConfigError, "gfapi_url must end with /api/"
  end
  if not @graph_path.nil? and @graph_path !~ /^[^\/]+\/[^\/]+\/[^\/]+$/
    raise Fluent::ConfigError, "graph_path must be like '${service}/${section}/${tag}_${key_name}'"
  end

  if @name_keys.nil? and @name_key_pattern.nil?
    raise Fluent::ConfigError, "missing both of name_keys and name_key_pattern"
  end
  if not @name_keys.nil? and not @name_key_pattern.nil?
    raise Fluent::ConfigError, "cannot specify both of name_keys and name_key_pattern"
  end
  if not @graphs.nil? and @name_keys.nil?
    raise Fluent::ConfigError, "graphs must be specified with name_keys"
  end

  if @name_keys
    @name_keys = @name_keys.split(',')
  end
  if @name_key_pattern
    @name_key_pattern = Regexp.new(@name_key_pattern)
  end

  if @graphs
    @graphs = @graphs.split(',')
  end
  if @name_keys and @graphs and @name_keys.size != @graphs.size
    raise Fluent::ConfigError, "sizes of name_keys and graphs do not match"
  end

  @mode = case @mode
          when 'count' then :count
          when 'modified' then :modified
          else
            :gauge
          end

  @tag_for = case @tag_for
             when 'ignore' then :ignore
             when 'section' then :section
             when 'service' then :service
             else
               :name_prefix
             end
  if @graph_path.nil?
    if @tag_for != :section and @section.nil?
      raise Fluent::ConfigError, "section parameter is needed when tag_for is not 'section'"
    end
    if @tag_for != :service and @service.nil?
      raise Fluent::ConfigError, "service parameter is needed when tag_for is not 'service'"
    end
    @graph_path = DEFAULT_GRAPH_PATH[@tag_for]
  end

  if @remove_prefix
    @removed_prefix_string = @remove_prefix + '.'
    @removed_length = @removed_prefix_string.length
  end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @resolver = Resolve::Hostname.new(system_resolver: true)
end

#connect_to(tag, name) ⇒ Object



192
193
194
195
# File 'lib/fluent/plugin/out_growthforecast.rb', line 192

def connect_to(tag, name)
  url = URI.parse(format_url(tag,name))
  return url.host, url.port
end

#escape(param) ⇒ Object



182
183
184
185
# File 'lib/fluent/plugin/out_growthforecast.rb', line 182

def escape(param)
  escaped ||= param
  escaped = CGI.escape(param) if param
end

#format_url(tag, name) ⇒ Object



187
188
189
190
# File 'lib/fluent/plugin/out_growthforecast.rb', line 187

def format_url(tag, name)
  graph_path = @graph_path.gsub(/(\${[_a-z]+})/, placeholder_mapping(tag, name))
  return @gfapi_url + graph_path
end

#gf_events(tag, time, record) ⇒ Object



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/fluent/plugin/out_growthforecast.rb', line 284

def gf_events(tag, time, record)
  events = []
  if @name_keys
    @name_keys.each_with_index do |name, i|
      if value = record[name]
        events.push({tag: tag, name: (@graphs ? @graphs[i] : name), value: value})
      end
    end
  else # for name_key_pattern
    record.keys.each do |key|
      if @name_key_pattern.match(key) and record[key]
        events.push({tag: tag, name: key, value: record[key]})
      end
    end
  end
  events
end

#gf_events_from_es(tag, es) ⇒ Object



302
303
304
305
306
307
308
# File 'lib/fluent/plugin/out_growthforecast.rb', line 302

def gf_events_from_es(tag, es)
  events = []
  es.each do |time, record|
    events.concat(gf_events(tag, time, record))
  end
  events
end

#http_connection(host, port) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/fluent/plugin/out_growthforecast.rb', line 197

def http_connection(host, port)
  http = Net::HTTP.new(@resolver.getaddress(host), port)
  if @timeout
    http.open_timeout = @timeout
    http.read_timeout = @timeout
  end
  if @ssl
    http.use_ssl = true
    unless @verify_ssl
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end
  http
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/fluent/plugin/out_growthforecast.rb', line 166

def multi_workers_ready?
  true
end

#placeholder_mapping(tag, name) ⇒ Object



174
175
176
177
178
179
180
# File 'lib/fluent/plugin/out_growthforecast.rb', line 174

def placeholder_mapping(tag, name)
  if @remove_prefix and
      ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
    tag = tag[@removed_length..-1]
  end
  {'${service}' => escape(@service), '${section}' => escape(@section), '${tag}' => escape(tag), '${key_name}' => escape(name)}
end

#post(tag, name, value) ⇒ Object



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/fluent/plugin/out_growthforecast.rb', line 227

def post(tag, name, value)
  url = format_url(tag,name)
  res = nil
  begin
    host,port = connect_to(tag, name)
    req = post_request(tag, name, value)
    http = http_connection(host, port)
    res = http.start {|client| client.request(req) }
  rescue IOError, EOFError, SystemCallError
    # server didn't respond
    log.warn "net/http POST raises exception: #{$!.class}, '#{$!.message}'"
  end
  unless res and res.is_a?(Net::HTTPSuccess)
    log.warn "failed to post to growthforecast: #{url}, number: #{value}, code: #{res && res.code}"
  end
end

#post_events(events) ⇒ Object



274
275
276
277
278
279
280
281
282
# File 'lib/fluent/plugin/out_growthforecast.rb', line 274

def post_events(events)
  if @keepalive
    post_keepalive(events)
  else
    events.each do |event|
      post(event[:tag], event[:name], event[:value])
    end
  end
end

#post_keepalive(events) ⇒ Object

:tag=>”,:name=>”,:value=>X


244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/fluent/plugin/out_growthforecast.rb', line 244

def post_keepalive(events) # [{:tag=>'',:name=>'',:value=>X}]
  return if events.size < 1

  # gf host/port is same for all events (host is from configuration)
  host,port = connect_to(events.first[:tag], events.first[:name])

  requests = events.map{|e| post_request(e[:tag], e[:name], e[:value])}

  http = nil
  requests.each do |req|
    begin
      unless http
        http = http_connection(host, port)
        http.start
      end
      res = http.request(req)
      unless res and res.is_a?(Net::HTTPSuccess)
        log.warn "failed to post to growthforecast: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}"
      end
    rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError
      log.warn "net/http keepalive POST raises exception", error: $!
      http.finish rescue nil # ignore all errors for connection with error
      http = nil
    end
  end
  if http
    http.finish rescue nil
  end
end

#post_request(tag, name, value) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/fluent/plugin/out_growthforecast.rb', line 212

def post_request(tag, name, value)
  url = URI.parse(format_url(tag,name))
  req = Net::HTTP::Post.new(url.path)
  if @auth and @auth == :basic
    req.basic_auth(@username, @password)
  end
  req['Host'] = url.host
  if @keepalive
    req['Connection'] = 'Keep-Alive'
  end
  value = @enable_float_number ? value.to_f : value.to_i
  req.set_form_data({'number' => value, 'mode' => @mode.to_s})
  req
end

#prefer_buffered_processingObject



170
171
172
# File 'lib/fluent/plugin/out_growthforecast.rb', line 170

def prefer_buffered_processing
  @background_post
end

#process(tag, es) ⇒ Object



310
311
312
313
314
315
316
317
# File 'lib/fluent/plugin/out_growthforecast.rb', line 310

def process(tag, es)
  events = gf_events_from_es(tag, es)
  begin
    post_events(events)
  rescue => e
    log.warn "HTTP POST Error occurs to growthforecast server, ignored (use background_post for retries)", error: e
  end
end

#write(chunk) ⇒ Object



319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/fluent/plugin/out_growthforecast.rb', line 319

def write(chunk)
  tag = chunk..tag
  events = []
  chunk.each do |time, record|
    events.concat(gf_events(tag, time, record))
  end
  begin
    post_events(events)
  rescue => e
    log.warn "HTTP POST Error occures to growthforecast server", error: e
    raise if @retry
  end
end