Class: Fluent::WebSocketOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::WebSocketOutput
- Defined in:
- lib/fluent/plugin/out_websocket.rb
Instance Method Summary collapse
Instance Method Details
#configure(conf) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/fluent/plugin/out_websocket.rb', line 31 def configure(conf) super $thread = Thread.new do $log.trace "Started em-websocket thread." $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]" EM.run { EM::WebSocket.run(:host => @host, :port => @port) do |ws| ws.onopen { |handshake| callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| ws.send(msg)} $lock.synchronize do sid = $channel.subscribe callback $log.trace "WebSocket connection: ID " + sid.to_s ws.onclose { $log.trace "Connection closed: ID " + sid.to_s $lock.synchronize do $channel.unsubscribe(sid) end } end #ws.onmessage { |msg| #} } end } end end |
#emit(tag, es, chain) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_websocket.rb', line 70 def emit(tag, es, chain) chain.next es.each {|time,record| data = [record] if (@add_time) then data.unshift(time) end if (@add_tag) then data.unshift(tag) end output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data ) $lock.synchronize do $channel.push output end } end |
#shutdown ⇒ Object
63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_websocket.rb', line 63 def shutdown super EM.stop Thread::kill($thread) $log.trace "Killed em-websocket thread." end |
#start ⇒ Object
59 60 61 |
# File 'lib/fluent/plugin/out_websocket.rb', line 59 def start super end |