Class: Fluent::EverySenseOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::EverySenseOutput
show all
- Includes:
- EverySenseProxy
- Defined in:
- lib/fluent/plugin/out_everysense.rb
Overview
EverySenseOutput output data to EverySense server this module assumes the input format follows everysense output specification
Instance Method Summary
collapse
#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, included, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?
Instance Method Details
#avg(chunk) ⇒ Object
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/fluent/plugin/out_everysense.rb', line 153
def avg(chunk)
device_data_list = []
chunk.msgpack_each do |tag, time, device_data|
device_data_list << transform_device_data(device_data)
end
avg_device_data = device_data_list[0]
device_data_list[1..-1].each do |device_data|
avg_device_data.each_with_index do |avg_sensor_data, i|
avg_sensor_data[:data][:at].to_i += device_data[i][:data][:at].to_i
avg_sensor_data[:data][:value] += device_data[i][:data][:value]
end
end
avg_device_data.each_with_index do |avg_sensor_data, i|
avg_sensor_data[:data][:at] = Time.at((avg_sensor_data[:data][:at] / device_data_list.size).to_i)
avg_sensor_data[:data][:value] = force_type(avg_sensor_data[:data][:value] / device_data_list.size)
end
$log.debug avg_device_data.to_json
put_message(avg_device_data.to_json)
end
|
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/fluent/plugin/out_everysense.rb', line 49
def configure(conf)
super
configure_formatter(conf)
@sensor_hash = {}
@sensor.each do |sensor|
if sensor.to_h[:input_name].nil?
@sensor_hash[sensor.to_h[:output_name]] = sensor.to_h
else
@sensor_hash[sensor.to_h[:input_name]] = sensor.to_h
end
end
$log.debug @sensor_hash.inspect
end
|
66
67
68
69
|
# File 'lib/fluent/plugin/out_everysense.rb', line 66
def configure_formatter(conf)
end
|
#force_type(value) ⇒ Object
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/fluent/plugin/out_everysense.rb', line 83
def force_type(value)
case @type_of_value
when "Integer"
return value.to_i
when "Float"
return value.to_f
when "String"
return value.to_s
else
return value.to_i
end
end
|
78
79
80
81
|
# File 'lib/fluent/plugin/out_everysense.rb', line 78
def format(tag, time, record)
$log.debug "tag: #{tag}, time: #{time}, record: #{record}"
[tag, time, record].to_msgpack
end
|
#get_output_name_by_index(index) ⇒ Object
131
132
133
|
# File 'lib/fluent/plugin/out_everysense.rb', line 131
def get_output_name_by_index(index)
@sensor_hash[@sensor_hash.keys[index]][:output_name]
end
|
#get_output_name_by_name(input_name) ⇒ Object
127
128
129
|
# File 'lib/fluent/plugin/out_everysense.rb', line 127
def get_output_name_by_name(input_name)
@sensor_hash[input_name][:output_name]
end
|
#max(chunk) ⇒ Object
175
176
177
|
# File 'lib/fluent/plugin/out_everysense.rb', line 175
def max(chunk)
end
|
#min(chunk) ⇒ Object
179
180
181
|
# File 'lib/fluent/plugin/out_everysense.rb', line 179
def min(chunk)
end
|
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
199
200
201
202
|
# File 'lib/fluent/plugin/out_everysense.rb', line 199
def shutdown
shutdown_proxy
super
end
|
#start ⇒ Object
This method is called when starting. Open sockets or files here.
73
74
75
76
|
# File 'lib/fluent/plugin/out_everysense.rb', line 73
def start
super
start_proxy
end
|
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/fluent/plugin/out_everysense.rb', line 135
def transform_device_data(device_data)
if device_data[0]["sensor_name"].nil?
return device_data.map.with_index do |sensor_data, i|
if !get_output_name_by_index(i).nil?
transform_sensor_data(sensor_data, get_output_name_by_index(i))
end
end.compact
end
device_data.map do |sensor_data|
if @sensor_hash.keys.include?(sensor_data["sensor_name"])
transform_sensor_data(sensor_data, get_output_name_by_name(sensor_data["sensor_name"]))
end
end.compact
end
|
output message format of EverySense is as follows
[
{
"data": {
"at":"2016-04-14 17:15:00 +0900",
"unit":"degree Celsius",
"value":23
},
"sensor_name":"FESTIVAL_Test1_Sensor"
},
{
"data": {
"at":"2016-04-14 17:15:00 +0900",
"unit":"%RH",
"value":30
},
"sensor_name":"FESTIVAL_Test1_Sensor2"
}
]
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/fluent/plugin/out_everysense.rb', line 116
def transform_sensor_data(sensor_data, output_name)
{
data: {
at: Time.parse(sensor_data["data"]["at"]),
unit: sensor_data["data"]["unit"],
value: force_type(sensor_data["data"]["value"])
},
sensor_name: output_name
}
end
|
#write(chunk) ⇒ Object
183
184
185
186
187
188
189
190
191
192
193
194
195
|
# File 'lib/fluent/plugin/out_everysense.rb', line 183
def write(chunk)
case @aggr_type
when "none"
chunk.msgpack_each do |tag, time, record|
put_message(transform_device_data(record["json"]).to_json)
end
when "avg", "max", "min"
method(@aggr_type).call(chunk)
else
raise NotImplementedError, "specified aggr_type is not implemented."
end
end
|