Class: Fluent::Plugin::SakuraIOInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_sakuraio.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



14
15
16
17
18
# File 'lib/fluent/plugin/in_sakuraio.rb', line 14

def configure(conf)
  super

  @time_parser = Fluent::TimeParser.new(nil)
end

#ensure_reactor_runningObject



27
28
29
30
31
32
# File 'lib/fluent/plugin/in_sakuraio.rb', line 27

def ensure_reactor_running
  return if EM.reactor_running?
  thread_create(:in_sakuraio_reactor) do
    EM.run
  end
end

#parse(text) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/in_sakuraio.rb', line 67

def parse(text)
  parser = Yajl::Parser.new
  j = parser.parse(text)
  records = []
  case j['type']
  when 'connection' then
    parse_connection(records, j)
  when 'location' then
    parse_location(records, j)
  when 'channels' then
    parse_channels(records, j)
  else
    log.debug "unknown type: #{j['type']}: #{text}"
  end
  records
end

#parse_channels(records, j) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/in_sakuraio.rb', line 115

def parse_channels(records, j)
  message_time = @time_parser.parse(j['datetime'])
  tag = j['module']
  j['payload']['channels'].each do |c|
    record = {
      'tag' => tag + '.channels.' + c['channel'].to_s,
      'record' => {
        'module' => j['module'],
        'channel' => c['channel'],
        'type' => c['type'],
        'value' => c['value']
      },
      'time' => @time_parser.parse(c['datetime']) || message_time
    }
    records.push(record)
  end
  records
end

#parse_connection(records, j) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/fluent/plugin/in_sakuraio.rb', line 84

def parse_connection(records, j)
  record = {
    'tag' => j['module'] + '.connection',
    'record' => {
      'module' => j['module'],
      'is_online' => j['payload']['is_online']
    },
    'time' => @time_parser.parse(j['datetime'])
  }
  records.push(record)
  records
end

#parse_location(records, j) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_sakuraio.rb', line 97

def parse_location(records, j)
  c = j['payload']['coordinate']
  if c != 'null'
    record = {
      'tag' => j['module'] + '.location',
      'record' => {
        'module' => j['module'],
        'latitude' => c['latitude'],
        'longitude' => c['longitude'],
        'range_m' => c['range_m']
      },
      'time' => @time_parser.parse(j['datetime'])
    }
    records.push(record)
  end
  records
end

#runObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/in_sakuraio.rb', line 40

def run
  client = Faye::WebSocket::Client.new(@url)
  EM.next_tick do
    client.on :open do
      log.info "sakuraio: starting websocket connection for #{@url}."
    end

    client.on :message do |event|
      log.debug "sakuraio: received message #{event.data}"
      records = parse(event.data)
      unless records.empty?
        records.each do |r|
          router.emit(r['tag'], r['time'], r['record'])
        end
      end
    end

    client.on :error do |event|
      log.warn "sakuraio: #{event.message}"
    end

    client.on :close do
      client = nil
    end
  end
end

#shutdownObject



34
35
36
37
38
# File 'lib/fluent/plugin/in_sakuraio.rb', line 34

def shutdown
  EM.stop if EM.reactor_running?

  super
end

#startObject



20
21
22
23
24
25
# File 'lib/fluent/plugin/in_sakuraio.rb', line 20

def start
  super

  ensure_reactor_running
  thread_create(:in_sakuraio, &method(:run))
end