Class: Fluent::Plugin::EverySenseInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::EverySenseInput
- Includes:
- EverySenseProxy
- Defined in:
- lib/fluent/plugin/in_everysense.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
received message format of EverySense is as follows.
- #configure_parser(conf) ⇒ Object
-
#emit(messages) ⇒ Object
Converted record for emission [ {“farm_uuid”: “XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX”, “device”: [ { “farm_uuid”: “XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX”, “sensor_name”: “collection_data_1”, “data_class_name”: “AirTemperature”, “data”: { “at”: “2016-05-12 21:38:52 UTC”, “memo”: null, “value”: 23, “unit”: “degree Celsius” } }, { “farm_uuid”: “XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX”, “sensor_name”: “collection_data_2”, “data_class_name”: “AirHygrometer”, “data”: { “at”: “2016-05-12 21:38:52 UTC”, “memo”: null, “value”: 30, “unit”: “%RH” } } ] } ].
- #parse_time(record) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Methods included from EverySenseProxy
#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?
Instance Method Details
#configure(conf) ⇒ Object
received message format of EverySense is as follows
[
[
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_1",
"data_class_name": "AirTemperature",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 23,
"unit": "degree Celsius"
}
},
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_2",
"data_class_name": "AirHygrometer",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 30,
"unit": "%RH"
}
}
]
]
70 71 72 73 |
# File 'lib/fluent/plugin/in_everysense.rb', line 70 def configure(conf) super configure_parser(conf) end |
#configure_parser(conf) ⇒ Object
75 76 77 78 79 |
# File 'lib/fluent/plugin/in_everysense.rb', line 75 def configure_parser(conf) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first @parser = parser_create(conf: parser_config) end |
#emit(messages) ⇒ Object
Converted record for emission [
{"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"device":
[
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_1",
"data_class_name": "AirTemperature",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 23,
"unit": "degree Celsius"
}
},
{
"farm_uuid": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
"sensor_name": "collection_data_2",
"data_class_name": "AirHygrometer",
"data": {
"at": "2016-05-12 21:38:52 UTC",
"memo": null,
"value": 30,
"unit": "%RH"
}
}
]
}
]
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/fluent/plugin/in_everysense.rb', line 135 def emit() begin time, record = @parser.parse() do |time, record| [time, record] end #log.debug "time: #{time.inspect}" #log.debug "record: #{record.inspect}" if record.is_a?(Array) && record[0].is_a?(Array) # Multiple devices case mes = Fluent::MultiEventStream.new record.each do |single_record| # use timestamp of the first sensor (single_record[0]) mes.add(parse_time(single_record[0]), { farm_uuid: single_record[0]["farm_uuid"], device: single_record }) end router.emit_stream(@tag, mes) elsif record.is_a?(Array) && record[0].is_a?(Hash) # Single device case # use timestamp of the first sensor (single_record[0]) router.emit(@tag, parse_time(record[0]), { farm_uuid: record[0]["farm_uuid"], device: record }) else # The other case raise Fluent::Plugin::Parser::ParserError, "Unexpected input format." end rescue Exception => e log.error error: e.to_s log.debug_backtrace(e.backtrace) end end |
#parse_time(record) ⇒ Object
96 97 98 99 100 101 102 103 |
# File 'lib/fluent/plugin/in_everysense.rb', line 96 def parse_time(record) if record["data"]["at"].nil? log.debug "Since time_key field is nil, Fluent::EventTime.now is used." Fluent::EventTime.now else @parser.parse_time(record["data"]["at"]) end end |
#shutdown ⇒ Object
167 168 169 170 |
# File 'lib/fluent/plugin/in_everysense.rb', line 167 def shutdown shutdown_proxy super end |
#start ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/in_everysense.rb', line 81 def start #raise StandardError.new if @tag.nil? super start_proxy timer_execute(:in_everysense, @polling_interval) do begin = emit() if !(.nil? || .empty?) rescue Exception => e log.error error: e.to_s log.debug_backtrace(e.backtrace) end end end |