Class: Fluent::WebSocketOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_websocket.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_websocket.rb', line 31

def configure(conf)
  super
  $thread = Thread.new do
  $log.trace "Started em-websocket thread."
  $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]"
  EM.run {
    EM::WebSocket.run(:host => @host, :port => @port) do |ws|
      ws.onopen { |handshake|
        callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| ws.send(msg)}
        $lock.synchronize do
          sid = $channel.subscribe callback
          $log.trace "WebSocket connection: ID " + sid.to_s
          ws.onclose {
            $log.trace "Connection closed: ID " + sid.to_s
            $lock.synchronize do
              $channel.unsubscribe(sid)
            end
          }
        end

        #ws.onmessage { |msg|
        #}
      }
    end
  }
  end
end

#emit(tag, es, chain) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/out_websocket.rb', line 70

def emit(tag, es, chain)
  chain.next
  es.each {|time,record|
    data = [record]
    if (@add_time) then data.unshift(time) end
    if (@add_tag) then data.unshift(tag) end
    output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data )
    $lock.synchronize do
      $channel.push output
    end
  }
end

#shutdownObject



63
64
65
66
67
68
# File 'lib/fluent/plugin/out_websocket.rb', line 63

def shutdown
  super
  EM.stop
  Thread::kill($thread)
  $log.trace "Killed em-websocket thread."
end

#startObject



59
60
61
# File 'lib/fluent/plugin/out_websocket.rb', line 59

def start
  super
end