Class: Fluent::PullForwardOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::PullForwardOutput
- Includes:
- Mixin::Certificate, Mixin::ConfigPlaceholders
- Defined in:
- lib/fluent/plugin/out_pull_forward.rb
Defined Under Namespace
Classes: HtpasswdDummy
Constant Summary collapse
- DEFAULT_PULLFORWARD_LISTEN_PORT =
24280
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #dequeue_chunks ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ PullForwardOutput
constructor
A new instance of PullForwardOutput.
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ PullForwardOutput
Returns a new instance of PullForwardOutput.
42 43 44 |
# File 'lib/fluent/plugin/out_pull_forward.rb', line 42 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
50 51 52 53 54 55 |
# File 'lib/fluent/plugin/out_pull_forward.rb', line 50 def configure(conf) super if @users.size < 1 raise Fluent::ConfigError, "no <user> sections specified" end end |
#dequeue_chunks ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/fluent/plugin/out_pull_forward.rb', line 130 def dequeue_chunks response = [] unpacker = MessagePack::Unpacker.new @buffer.pull_chunks do |chunk| next if chunk.empty? unpacker.feed_each(chunk.read) do |ary| response << ary end end response.to_json end |
#format(tag, time, record) ⇒ Object
126 127 128 |
# File 'lib/fluent/plugin/out_pull_forward.rb', line 126 def format(tag, time, record) [tag, time, record].to_msgpack end |
#run ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/fluent/plugin/out_pull_forward.rb', line 78 def run cert, key = self.certificate realm = "Fluentd fluent-plugin-pullforward server" logger = $log auth_logger = Fluent::PluginLogger.new(logger) auth_logger.level = @auth_loglevel server_logger = Fluent::PluginLogger.new(logger) server_logger.level = @server_loglevel auth_db = HtpasswdDummy.new @users.each do |user| auth_db.set_passwd(realm, user.username, user.password) end authenticator = WEBrick::HTTPAuth::BasicAuth.new( :UserDB => auth_db, :Realm => realm, :Logger => Fluent::PullForward::WEBrickLogger.new(auth_logger), ) @server = WEBrick::HTTPServer.new( :BindAddress => @bind, :Port => @port, # :DocumentRoot => '.', :Logger => Fluent::PullForward::WEBrickLogger.new(server_logger), :AccessLog => [], :SSLEnable => true, :SSLCertificate => cert, :SSLPrivateKey => key ) @server.logger.info("hogepos") @server.mount_proc('/') do |req, res| unless req.ssl? raise WEBrick::HTTPStatus::Forbidden, "pullforward plugin does not permit non-HTTPS requests" end if req.path != '/' raise WEBrick::HTTPStatus::NotFound, "valid path is only '/'" end authenticator.authenticate(req, res) res.content_type = 'application/json' res.body = dequeue_chunks() end log.info "listening pullforward socket on #{@bind}:#{@port}" @server.start end |
#shutdown ⇒ Object
62 63 64 65 66 |
# File 'lib/fluent/plugin/out_pull_forward.rb', line 62 def shutdown @server.stop if @server @thread.kill @thread.join end |
#start ⇒ Object
57 58 59 60 |
# File 'lib/fluent/plugin/out_pull_forward.rb', line 57 def start super @thread = Thread.new(&method(:run)) end |