Class: Fluent::KestrelInput
- Inherits:
-
Input
- Object
- Input
- Fluent::KestrelInput
- Defined in:
- lib/fluent/plugin/in_kestrel.rb
Instance Attribute Summary collapse
-
#kestrel ⇒ Object
readonly
Returns the value of attribute kestrel.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ KestrelInput
constructor
A new instance of KestrelInput.
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ KestrelInput
15 16 17 18 19 |
# File 'lib/fluent/plugin/in_kestrel.rb', line 15 def initialize super require 'kestrel' require 'time' end |
Instance Attribute Details
#kestrel ⇒ Object (readonly)
Returns the value of attribute kestrel.
4 5 6 |
# File 'lib/fluent/plugin/in_kestrel.rb', line 4 def kestrel @kestrel end |
Instance Method Details
#configure(conf) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/in_kestrel.rb', line 21 def configure(conf) super unless @queue && @host raise ConfigError, "[kestrel config error]:'host' and 'queue' option is required." end unless @tag raise ConfigError, "[kestrel config error]:'tag' option is required." end @timef = TimeFormatter.new(@time_format, @localtime) = { :raw => @raw, :peek => @peek, :timeout => @timeout }.freeze end |
#run ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/fluent/plugin/in_kestrel.rb', line 50 def run loop do data = @kestrel.get(@queue, ) unless data sleep 1 else Engine.emit(@tag, Engine.now, data) end end rescue $log.error "unexpected error.", :error=>$!.to_s $log.error_backtrace end |
#shutdown ⇒ Object
45 46 47 48 |
# File 'lib/fluent/plugin/in_kestrel.rb', line 45 def shutdown @thread.join super end |
#start ⇒ Object
38 39 40 41 42 43 |
# File 'lib/fluent/plugin/in_kestrel.rb', line 38 def start super @kestrel = Kestrel::Client.new(@host + ":" + @port.to_s) @thread = Thread.new(&method(:run)) end |