Class: Fluent::SixpackOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sixpack.rb

Constant Summary collapse

SIXPACK_PATH =
{
  :participate => '/participate',
  :convert     => '/convert'
}

Instance Method Summary collapse

Constructor Details

#initializeSixpackOutput

Returns a new instance of SixpackOutput.



4
5
6
7
8
9
# File 'lib/fluent/plugin/out_sixpack.rb', line 4

def initialize
  super
  require 'net/http'
  require 'uri'
  require 'resolve/hostname'
end

Instance Method Details

#configure(conf) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_sixpack.rb', line 47

def configure(conf)
  super

  @mode = case @mode
          when 'count' then :count
          when 'modified' then :modified
          else
            :gauge
          end

  @auth = case @authentication
          when 'basic' then :basic
          else
            :none
          end
  @resolver = Resolve::Hostname.new(:system_resolver => true)
end

#connect_toObject



104
105
106
107
# File 'lib/fluent/plugin/out_sixpack.rb', line 104

def connect_to
  url = URI.parse(@sixpackapi_url)
  return url.host, url.port
end

#emit(tag, es, chain) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/fluent/plugin/out_sixpack.rb', line 234

def emit(tag, es, chain)
  events = []

  es.each {|time,record|
    if SIXPACK_PATH.has_key?(record[@key_record_type].to_sym)
      events.push({:time => time, :tag => tag, :record => record})
    end
  }

  if @thread
    @mutex.synchronize do
      @queue += events
    end
  else
    begin
      post_events(events)
    rescue => e
      log.warn "HTTP POST Error occures to sixpack server", :error_class => e.class, :error => e.message
      raise if @retry
    end
  end

  chain.next
end

#form_encode_params_convert(record) ⇒ Object



146
147
148
149
150
151
152
153
154
# File 'lib/fluent/plugin/out_sixpack.rb', line 146

def form_encode_params_convert(record)
  params = {
     :experiment   => record[@key_experiment],
     :client_id    => record[@key_client_id],
  }
  params.merge!({:kpi => record[@key_kpi]}) if(record[@key_kpi])

  return URI.encode_www_form(params)
end

#http_connection(host, port) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_sixpack.rb', line 109

def http_connection(host, port)
  http = Net::HTTP.new(@resolver.getaddress(host), port)
  if @timeout
    http.open_timeout = @timeout
    http.read_timeout = @timeout
  end
  if @ssl
    http.use_ssl = true
    unless @verify_ssl
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end
  http
end

#map_sixpack_path(record) ⇒ Object



124
125
126
# File 'lib/fluent/plugin/out_sixpack.rb', line 124

def map_sixpack_path(record)
  sixpack_path
end

#map_sixpack_path_with_query(record) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/fluent/plugin/out_sixpack.rb', line 128

def map_sixpack_path_with_query(record)
  sixpack_path = SIXPACK_PATH[record[@key_record_type].to_sym]
  case record[@key_record_type]
  when 'participate'
    return sixpack_path, URI.encode_www_form({
             :experiment   => record[@key_experiment],
             :alternatives => record[@key_alternatives].split(','),
             :alternative  => record[@key_alternative],
             :client_id    => record[@key_client_id],
           })
  when 'convert'
    return sixpack_path, form_encode_params_convert(record)
  else
    log.warn 'failed to map sixpack path and query'
    raise
  end
end

#post(event) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/fluent/plugin/out_sixpack.rb', line 171

def post(event)
  url = @sixpackapi_url
  res = nil
  begin
    host,port = connect_to
    req = post_request(event)
    http = http_connection(host, port)
    res = http.start {|http| http.request(req) }
  rescue IOError, EOFError, SystemCallError
    # server didn't respond
    log.warn "net/http GET raises exception: #{$!.class}, '#{$!.message}'"
  end
  unless res and res.is_a?(Net::HTTPSuccess)
    log.warn "failed to post to sixpack #{url}, record#{event[:record]}, code: #{res && res.code}"
  end
end

#post_events(events) ⇒ Object



224
225
226
227
228
229
230
231
232
# File 'lib/fluent/plugin/out_sixpack.rb', line 224

def post_events(events)
  if @keepalive
    post_keepalive(events)
  else
    events.each do |event|
      post(event)
    end
  end
end

#post_keepalive(events) ⇒ Object

:tag=>”,:name=>”,:value=>X


188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/fluent/plugin/out_sixpack.rb', line 188

def post_keepalive(events) # [{:tag=>'',:name=>'',:value=>X}]
  return if events.size < 1

  # sixpack host/port is same for all events (host is from configuration)
  host,port = connect_to

  requests = events.map{|e| post_request(e)}

  http = nil
  requests.each do |req|
    begin
      unless http
        http = http_connection(host, port)
        http.start
      end
      res = http.request(req)
      unless res and res.is_a?(Net::HTTPSuccess)
        log.warn "failed to post to sixpack: #{host}:#{port}#{req.path}, post_data: #{req.body} code: #{res && res.code}"
      end
    rescue IOError, EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, SystemCallError
      log.warn "net/http keepalive POST raises exception: #{$!.class}, '#{$!.message}'"
      begin
        http.finish
      rescue
        # ignore all errors for connection with error
      end
      http = nil
    end
  end
  begin
    http.finish
  rescue
    # ignore
  end
end

#post_request(event) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/out_sixpack.rb', line 156

def post_request(event)
  uri = URI.parse(@sixpackapi_url)
  uri.path, uri.query = map_sixpack_path_with_query(event[:record])
  req = Net::HTTP::Get.new(uri.request_uri)
  if @auth and @auth == :basic
    req.basic_auth(@username, @password)
  end
  req['Host'] = uri.host
  if @keepalive
    req['Connection'] = 'Keep-Alive'
  end

  req
end

#posterObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_sixpack.rb', line 85

def poster
  while @running
    if @queue.size < 1
      sleep(0.2)
      next
    end

    events = @mutex.synchronize {
      es,@queue = @queue,[]
      es
    }
    begin
      post_events(events) if events.size > 0
    rescue => e
      log.warn "HTTP POST in background Error occures to sixpack server", :error_class => e.class, :error => e.message
    end
  end
end

#shutdownObject



79
80
81
82
83
# File 'lib/fluent/plugin/out_sixpack.rb', line 79

def shutdown
  @running = false
  @thread.join if @thread
  super
end

#startObject



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/fluent/plugin/out_sixpack.rb', line 65

def start
  super

  @running = true
  @thread = nil
  @queue = nil
  @mutex = nil
  if @background_post
    @mutex = Mutex.new
    @queue = []
    @thread = Thread.new(&method(:poster))
  end
end