Class: Fluent::WebSocketOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_websocket.rb

Instance Method Summary collapse

Instance Method Details

#buffer(data) ⇒ Object



124
125
126
127
128
129
# File 'lib/fluent/plugin/out_websocket.rb', line 124

def buffer(data)
  return unless @buffered_messages > 0
  @buffer << data
  # Buffer only new @buffered_messages messages
  @buffer = @buffer[-@buffered_messages, @buffered_messages] if @buffer.length > @buffered_messages
end

#configure(conf) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/out_websocket.rb', line 33

def configure(conf)
  super
  @thread = Thread.new do
    $log.trace "Started em-websocket thread."
    $log.info "WebSocket server #{@host}:#{@port} [msgpack: #{@use_msgpack}]"
    EM.run {
      EM::WebSocket.run(:host => @host, :port => @port) do |ws|
        ws.onopen { |handshake|
          $log.info "WebSocket opened #{{
            :path => handshake.path,
            :query => handshake.query,
            :origin => handshake.origin,
          }}"
          if doAuth(handshake.query)
            callback = @use_msgpack ? proc{|msg| ws.send_binary(msg)} : proc{|msg| sendMsg(handshake.query, ws, msg)}
            $lock.synchronize do
              sid = $channel.subscribe callback
              $log.trace "WebSocket connection: ID " + sid.to_s
              ws.onclose {
                $log.trace "Connection closed: ID " + sid.to_s
                $lock.synchronize do
                  $channel.unsubscribe(sid)
                end
              }
              @buffer.each do |msg|
                sendMsg(handshake.query, ws, msg)
              end
            end
          else
            ws.send("Unauthorized")
          end

          #ws.onmessage { |msg|
          #}
        }
      end
    }
  end
end

#doAuth(query) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/out_websocket.rb', line 73

def doAuth(query)
  if @token.nil? || ( query.key?("token") && @token == query["token"] )
    $log.trace "Auth OK"
    return true
  end

  $log.trace "Auth failed"
  return false
end

#emit(tag, es, chain) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_websocket.rb', line 110

def emit(tag, es, chain)
  chain.next
  es.each {|time,record|
    data = [record]
    if (@add_time) then data.unshift(time) end
    if (@add_tag) then data.unshift(tag) end
    output = @use_msgpack ? data.to_msgpack : Yajl::Encoder.encode( data )
    buffer(output)
    $lock.synchronize do
      $channel.push output
    end
  }
end

#sendMsg(filters, ws, msg) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_websocket.rb', line 83

def sendMsg(filters, ws, msg)
  parser = Yajl::Parser.new
  msgStruct = parser.parse(msg)
  return if msgStruct.length != 2
  msgContent = msgStruct[1]

  pass = 0

  filters.each do |key, value|
    pass += 1 if key == 'token' || ( msgContent.key?(key) && msgContent[key] == value )
  end

  ws.send(msg) if filters.length == pass
end

#shutdownObject



103
104
105
106
107
108
# File 'lib/fluent/plugin/out_websocket.rb', line 103

def shutdown
  super
  EM.stop
  Thread::kill(@thread)
  $log.trace "Killed em-websocket thread."
end

#startObject



98
99
100
101
# File 'lib/fluent/plugin/out_websocket.rb', line 98

def start
  @buffer = []
  super
end