Class: Fluent::Plugin::EverySenseInput

Inherits:
Input
  • Object
show all
Includes:
EverySenseProxy
Defined in:
lib/fluent/plugin/in_everysense.rb

Instance Method Summary collapse

Methods included from EverySenseProxy

#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?

Instance Method Details

#configure(conf) ⇒ Object

received message format of EverySense is as follows

[

[
  {
    "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "sensor_name": "collection_data_1",
    "data_class_name": "AirTemperature",
    "data": {
      "at": "2016-05-12 21:38:52 UTC",
      "memo": null,
      "value": 23,
      "unit": "degree Celsius"
    }
  },
  {
    "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
    "sensor_name": "collection_data_2",
    "data_class_name": "AirHygrometer",
    "data": {
      "at": "2016-05-12 21:38:52 UTC",
      "memo": null,
      "value": 30,
      "unit": "%RH"
    }
  }
]

]



70
71
72
73
# File 'lib/fluent/plugin/in_everysense.rb', line 70

def configure(conf)
  super
  configure_parser(conf)
end

#configure_parser(conf) ⇒ Object



75
76
77
78
79
# File 'lib/fluent/plugin/in_everysense.rb', line 75

def configure_parser(conf)
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  @parser = parser_create(conf: parser_config)
end

#emit(messages) ⇒ Object

Converted record for emission [

{"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
 "device":
  [
    {
      "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
      "sensor_name": "collection_data_1",
      "data_class_name": "AirTemperature",
      "data": {
        "at": "2016-05-12 21:38:52 UTC",
        "memo": null,
        "value": 23,
        "unit": "degree Celsius"
      }
    },
    {
      "farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
      "sensor_name": "collection_data_2",
      "data_class_name": "AirHygrometer",
      "data": {
        "at": "2016-05-12 21:38:52 UTC",
        "memo": null,
        "value": 30,
        "unit": "%RH"
      }
    }
  ]
}

]



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/fluent/plugin/in_everysense.rb', line 135

def emit(messages)
  begin
    time, record = @parser.parse(messages) do |time, record|
      [time, record]
    end
    #log.debug "time: #{time.inspect}"
    #log.debug "record: #{record.inspect}"
    if record.is_a?(Array) && record[0].is_a?(Array) # Multiple devices case
      mes = Fluent::MultiEventStream.new
      record.each do |single_record|
        # use timestamp of the first sensor (single_record[0])
        mes.add(parse_time(single_record[0]), {
          farm_uuid: single_record[0]["farm_uuid"],
          device: single_record
        })
      end
      router.emit_stream(@tag, mes)
    elsif record.is_a?(Array) && record[0].is_a?(Hash) # Single device case
      # use timestamp of the first sensor (single_record[0])
      router.emit(@tag, parse_time(record[0]), {
        farm_uuid: record[0]["farm_uuid"],
        device: record
      })
    else # The other case
      raise Fluent::Plugin::Parser::ParserError, "Unexpected input format."
    end
  rescue Exception => e
    log.error error: e.to_s
    log.debug_backtrace(e.backtrace)
  end
end

#parse_time(record) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/in_everysense.rb', line 96

def parse_time(record)
  if record["data"]["at"].nil?
    log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
    Fluent::EventTime.now
  else
    @parser.parse_time(record["data"]["at"])
  end
end

#shutdownObject



167
168
169
170
# File 'lib/fluent/plugin/in_everysense.rb', line 167

def shutdown
  shutdown_proxy
  super
end

#startObject



81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/in_everysense.rb', line 81

def start
  #raise StandardError.new if @tag.nil?
  super
  start_proxy
  timer_execute(:in_everysense, @polling_interval) do
    begin
      messages = get_messages
      emit(messages) if !(messages.nil? || messages.empty?)
    rescue Exception => e
      log.error error: e.to_s
      log.debug_backtrace(e.backtrace)
    end
  end
end