Class: Fluent::Plugin::SakuraIOInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::SakuraIOInput
- Defined in:
- lib/fluent/plugin/in_sakuraio.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #ensure_reactor_running ⇒ Object
- #parse(text) ⇒ Object
- #parse_channels(records, j) ⇒ Object
- #parse_connection(records, j) ⇒ Object
- #parse_location(records, j) ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
14 15 16 17 18 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 14 def configure(conf) super @time_parser = Fluent::TimeParser.new(nil) end |
#ensure_reactor_running ⇒ Object
27 28 29 30 31 32 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 27 def ensure_reactor_running return if EM.reactor_running? thread_create(:in_sakuraio_reactor) do EM.run end end |
#parse(text) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 67 def parse(text) parser = Yajl::Parser.new j = parser.parse(text) records = [] case j['type'] when 'connection' then parse_connection(records, j) when 'location' then parse_location(records, j) when 'channels' then parse_channels(records, j) else log.debug "unknown type: #{j['type']}: #{text}" end records end |
#parse_channels(records, j) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 115 def parse_channels(records, j) = @time_parser.parse(j['datetime']) tag = j['module'] j['payload']['channels'].each do |c| record = { 'tag' => tag + '.channels.' + c['channel'].to_s, 'record' => { 'module' => j['module'], 'channel' => c['channel'], 'type' => c['type'], 'value' => c['value'] }, 'time' => @time_parser.parse(c['datetime']) || } records.push(record) end records end |
#parse_connection(records, j) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 84 def parse_connection(records, j) record = { 'tag' => j['module'] + '.connection', 'record' => { 'module' => j['module'], 'is_online' => j['payload']['is_online'] }, 'time' => @time_parser.parse(j['datetime']) } records.push(record) records end |
#parse_location(records, j) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 97 def parse_location(records, j) c = j['payload']['coordinate'] if c != 'null' record = { 'tag' => j['module'] + '.location', 'record' => { 'module' => j['module'], 'latitude' => c['latitude'], 'longitude' => c['longitude'], 'range_m' => c['range_m'] }, 'time' => @time_parser.parse(j['datetime']) } records.push(record) end records end |
#run ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 40 def run client = Faye::WebSocket::Client.new(@url) EM.next_tick do client.on :open do log.info "sakuraio: starting websocket connection for #{@url}." end client.on :message do |event| log.debug "sakuraio: received message #{event.data}" records = parse(event.data) unless records.empty? records.each do |r| router.emit(r['tag'], r['time'], r['record']) end end end client.on :error do |event| log.warn "sakuraio: #{event.message}" end client.on :close do client = nil end end end |
#shutdown ⇒ Object
34 35 36 37 38 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 34 def shutdown EM.stop if EM.reactor_running? super end |
#start ⇒ Object
20 21 22 23 24 25 |
# File 'lib/fluent/plugin/in_sakuraio.rb', line 20 def start super ensure_reactor_running thread_create(:in_sakuraio, &method(:run)) end |