Class: Fluent::Plugin::WendelinOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_wendelin.rb

Direct Known Subclasses

WendelinOutputFormatted

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/out_wendelin.rb', line 56

def configure(conf)
    super
		
		unless @chunk_key_tag
raise Fluent::ConfigError, "buffer chunk key must include 'tag' for wendelin output"
		end

    credentials = {}
    if @user
        credentials['user']     = @user
        credentials['password'] = @password
    end
    @wendelin = WendelinClient.new(@streamtool_uri, credentials, @log,
                                   @ssl_timeout, @open_timeout,
                                   @read_timeout, @keep_alive_timeout)
end

#shutdownObject



78
79
80
81
# File 'lib/fluent/plugin/out_wendelin.rb', line 78

def shutdown
    super
    # TODO
end

#startObject



73
74
75
76
# File 'lib/fluent/plugin/out_wendelin.rb', line 73

def start
    super
    # TODO
end

#write(chunk) ⇒ Object

Use normal “Synchronous Buffer” - write out records from a buffer chunk for a tag.



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_wendelin.rb', line 85

def write(chunk)
 return if chunk.empty?
    # NOTE if this fail and raises -> it will unroll to Output#try_flush
    # which detects errors and retries outputting logs up to retry maxcount
    # times and aborts outputting current logs if all try fail.
    #
    # This way, we don't need to code rescue here.

    # NOTE tag is 1, and chunk stores an event stream, usually [] of
    # (timestamp, record) in msgpack, but it general it could be arbitrary
    # data - we send it as-is.
    data_chunk = chunk.read()

    # for input_stream_ref use tag as-is - it will be processed/translated
    # further on server by Wendelin
    reference = chunk..tag

    if @use_keep_alive
      @wendelin.ingest_with_keep_alive(reference, data_chunk)
    else
      @wendelin.ingest(reference, data_chunk)
    end
end