Class: Fluent::WebsocketInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_websocket.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



18
19
20
# File 'lib/fluent/plugin/in_websocket.rb', line 18

def configure(conf)
  super
end

#log_message(text) ⇒ Object



31
32
33
# File 'lib/fluent/plugin/in_websocket.rb', line 31

def log_message text
  $log.info text
end

#runObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/fluent/plugin/in_websocket.rb', line 35

def run
  EM.run {
    log_message "EM will be runned on #{@host}:#{@port}"
    EM::WebSocket.run(host: @host, port: @port) do |ws|
      ws.onopen { |handshake|
        log_message "WebSocket connection open"
        ws.send "Hello Client, you connected to #{handshake.path}"
      }

      ws.onerror { |e|
        log_message "error occured"
        log_message "error: #{e}"
       }

      ws.onclose { |e|
        log_message "Connection closed"
        log_message "reason: #{e}"
       }

      ws.onmessage { |msg|
        log_message "Recieved message: #{msg}"
        data = JSON.parse(msg)
        router.emit(data['label'], Engine.now, data['record'])
        ws.send "Pong: #{msg}"
      }
    end
  }
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



66
67
68
69
70
71
# File 'lib/fluent/plugin/in_websocket.rb', line 66

def shutdown
  super
  EM.stop
  Thread::kill(@thread)
  log_message 'closed EM and thread'
end

#startObject

This method is called when starting. Open sockets or files and create a thread here.



24
25
26
27
28
29
# File 'lib/fluent/plugin/in_websocket.rb', line 24

def start
  super
  @thread = Thread.new do
    run
  end
end