Class: Fluent::PullForwardInput
- Inherits:
-
Input
- Object
- Input
- Fluent::PullForwardInput
- Defined in:
- lib/fluent/plugin/in_pull_forward.rb
Constant Summary collapse
- DEFAULT_PULLFORWARD_LISTEN_PORT =
24280
Instance Attribute Summary collapse
-
#hostname_resolver ⇒ Object
readonly
Returns the value of attribute hostname_resolver.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #fetch(server) ⇒ Object
- #fetcher ⇒ Object
-
#initialize ⇒ PullForwardInput
constructor
A new instance of PullForwardInput.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ PullForwardInput
21 22 23 24 25 26 27 28 |
# File 'lib/fluent/plugin/in_pull_forward.rb', line 21 def initialize super require 'resolve/hostname' require 'net/http' require 'net/https' require 'openssl' require 'yajl' end |
Instance Attribute Details
#hostname_resolver ⇒ Object (readonly)
Returns the value of attribute hostname_resolver.
19 20 21 |
# File 'lib/fluent/plugin/in_pull_forward.rb', line 19 def hostname_resolver @hostname_resolver end |
Instance Method Details
#configure(conf) ⇒ Object
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/in_pull_forward.rb', line 35 def configure(conf) super @verify_mode = if @allow_self_signed_certificate OpenSSL::SSL::VERIFY_NONE else OpenSSL::SSL::VERIFY_PEER end @resolver = Resolve::Hostname.new(:system_resolver => true) end |
#fetch(server) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/fluent/plugin/in_pull_forward.rb', line 74 def fetch(server) body = nil begin address = @resolver.getaddress(server.host) https = Net::HTTP.new(address, server.port) https.open_timeout = @timeout https.read_timeout = @timeout https.use_ssl = true https.verify_mode = @verify_mode req = Net::HTTP::Get.new('/') req.basic_auth server.username, server.password res = https.start{ https.request(req) } if res && res.is_a?(Net::HTTPSuccess) body = res.body else log.warn "failed to GET from Fluentd PullForward: #{server.host}, #{address}:#{server.port}, by #{res.class}" end rescue IOError, EOFError, SystemCallError => e log.warn "net/http GET raised an exception: #{e.class}, '#{e.}'" end return unless body data = nil begin data = Yajl::Parser.parse(body) rescue => e # maybe parse error log.warn "an error occured for parse of transferred content: #{e.class}, '#{e.}'" end return unless data bundle = {} data.each do |tag, time, record| bundle[tag] ||= Fluent::MultiEventStream.new bundle[tag].add(time, record) end bundle.each do |tag, es| Fluent::Engine.emit_stream(tag, es) end end |
#fetcher ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/fluent/plugin/in_pull_forward.rb', line 58 def fetcher next_fetch = Time.now while @running if Time.now >= next_fetch @servers.each do |server| if @running fetch(server) end end next_fetch = Time.now + @fetch_interval end break unless @running sleep 1 end end |
#shutdown ⇒ Object
52 53 54 55 56 |
# File 'lib/fluent/plugin/in_pull_forward.rb', line 52 def shutdown super @running = false @thread.join end |
#start ⇒ Object
46 47 48 49 50 |
# File 'lib/fluent/plugin/in_pull_forward.rb', line 46 def start super @running = true @thread = Thread.new(&method(:fetcher)) end |