Class: WendelinClient
- Inherits:
-
Object
- Object
- WendelinClient
- Defined in:
- lib/fluent/plugin/wendelin_client.rb
Overview
class representing a Wendelin client
Instance Method Summary collapse
-
#ingest(reference, data_chunk) ⇒ Object
ingest
data_chunkto a stream referenced asreference. -
#initialize(streamtool_uri, credentials, log) ⇒ WendelinClient
constructor
streamtool_uri- URI pointing to portal_input_data_stream “mountpoint”credentials# => _, ‘password’ => _ TODO change to certificatelog- logger to use.
Constructor Details
#initialize(streamtool_uri, credentials, log) ⇒ WendelinClient
streamtool_uri - URI pointing to portal_input_data_stream “mountpoint” credentials # => _, ‘password’ => _ TODO change to certificate log - logger to use
29 30 31 32 33 |
# File 'lib/fluent/plugin/wendelin_client.rb', line 29 def initialize(streamtool_uri, credentials, log) @streamtool_uri = streamtool_uri @credentials = credentials @log = log end |
Instance Method Details
#ingest(reference, data_chunk) ⇒ Object
ingest data_chunk to a stream referenced as reference
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/fluent/plugin/wendelin_client.rb', line 37 def ingest(reference, data_chunk) uri = URI("#{@streamtool_uri}/ingest?reference=#{reference}") req = Net::HTTP::Post.new(uri) if @credentials.has_key?('user') req.basic_auth @credentials['user'], @credentials['password'] end # TODO ensure content-type is 'raw', e.g. this way # (but then querystring ?reference=... is lost) # req.body = data_chunk # req.content_type = 'application/octet-stream' req.set_form_data('data_chunk' => data_chunk) @log.on_trace do @log.trace '>>> REQUEST' @log.trace "method\t=> #{req.method}" @log.trace "path\t=> #{req.path}" @log.trace "uri\t=> #{req.uri}" @log.trace "body\t=> #{req.body}" @log.trace "body_stream\t=> #{req.body_stream}" req.each {|h| @log.trace "#{h}:\t#{req[h]}"} @log.trace end begin # TODO keep connection open (so that every new ingest does not do # full connect again) res = Net::HTTP.start(uri.hostname, uri.port, :use_ssl => (uri.scheme == 'https'), # NOTE = "do not check server cert" # TODO move this out to conf parameters :verify_mode => OpenSSL::SSL::VERIFY_NONE, # Net::HTTP default open timeout is infinity, which results # in thread hang forever if other side does not fully # establish connection. Default read_timeout is 60 seconds. # We go safe way and make sure all timeouts are defined. :ssl_timeout => 60, :open_timeout => 60, :read_timeout => 60, ) do |http| http.request(req) end rescue # some http/ssl/other connection error @log.warn "HTTP ERROR:" raise else @log.on_trace do @log.trace '>>> RESPONSE' res.each {|h| @log.trace "#{h}:\t#{res[h]}"} @log.trace "code\t=> #{res.code}" @log.trace "msg\t=> #{res.message}" @log.trace "class\t=> #{res.class}" @log.trace "body:", res.body end if res.kind_of?(Net::HTTPSuccess) # res.code is 2XX #@log.info "ingested ok" else @log.warn "FAIL:" res.value end end end |