Class: Fluent::EverySenseOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
EverySenseProxy
Defined in:
lib/fluent/plugin/out_everysense.rb

Overview

EverySenseOutput output data to EverySense server this module assumes the input format follows everysense output specification

Instance Method Summary collapse

Methods included from EverySenseProxy

#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, included, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?

Instance Method Details

#avg(chunk) ⇒ Object



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/fluent/plugin/out_everysense.rb', line 153

def avg(chunk)
  device_data_list = []
  chunk.msgpack_each do |tag, time, device_data|
    device_data_list << transform_device_data(device_data)
  end
  avg_device_data = device_data_list[0]
  device_data_list[1..-1].each do |device_data|
    avg_device_data.each_with_index do |avg_sensor_data, i|
      avg_sensor_data[:data][:at].to_i += device_data[i][:data][:at].to_i
      avg_sensor_data[:data][:value] += device_data[i][:data][:value]
    end
  end
  avg_device_data.each_with_index do |avg_sensor_data, i|
    # average time
    avg_sensor_data[:data][:at] = Time.at((avg_sensor_data[:data][:at] / device_data_list.size).to_i)
    # average value
    avg_sensor_data[:data][:value] = force_type(avg_sensor_data[:data][:value] / device_data_list.size)
  end
  $log.debug avg_device_data.to_json
  put_message(avg_device_data.to_json)
end

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_everysense.rb', line 49

def configure(conf)
  super
  configure_formatter(conf)
  #@sensor.each do |s|
  #  $log.debug s.to_h.inspect
  #end
  @sensor_hash = {}
  @sensor.each do |sensor|
    if sensor.to_h[:input_name].nil?
      @sensor_hash[sensor.to_h[:output_name]] = sensor.to_h
    else
      @sensor_hash[sensor.to_h[:input_name]] = sensor.to_h
    end
  end
  $log.debug @sensor_hash.inspect
end

#configure_formatter(conf) ⇒ Object



66
67
68
69
# File 'lib/fluent/plugin/out_everysense.rb', line 66

def configure_formatter(conf)
  #@formatter = Plugin.new_formatter(@format)
  #@formatter.configure(conf)
end

#force_type(value) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/fluent/plugin/out_everysense.rb', line 83

def force_type(value)
  case @type_of_value
  when "Integer"
    return value.to_i
  when "Float"
    return value.to_f
  when "String"
    return value.to_s
  else
    return value.to_i
  end
end

#format(tag, time, record) ⇒ Object



78
79
80
81
# File 'lib/fluent/plugin/out_everysense.rb', line 78

def format(tag, time, record)
  $log.debug "tag: #{tag}, time: #{time}, record: #{record}"
  [tag, time, record].to_msgpack
end

#get_output_name_by_index(index) ⇒ Object



131
132
133
# File 'lib/fluent/plugin/out_everysense.rb', line 131

def get_output_name_by_index(index)
  @sensor_hash[@sensor_hash.keys[index]][:output_name]
end

#get_output_name_by_name(input_name) ⇒ Object



127
128
129
# File 'lib/fluent/plugin/out_everysense.rb', line 127

def get_output_name_by_name(input_name)
  @sensor_hash[input_name][:output_name]
end

#max(chunk) ⇒ Object



175
176
177
# File 'lib/fluent/plugin/out_everysense.rb', line 175

def max(chunk)
  # TODO
end

#min(chunk) ⇒ Object



179
180
181
# File 'lib/fluent/plugin/out_everysense.rb', line 179

def min(chunk)
  # TODO
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



199
200
201
202
# File 'lib/fluent/plugin/out_everysense.rb', line 199

def shutdown
  shutdown_proxy
  super
end

#startObject

This method is called when starting. Open sockets or files here.



73
74
75
76
# File 'lib/fluent/plugin/out_everysense.rb', line 73

def start
  super
  start_proxy
end

#transform_device_data(device_data) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fluent/plugin/out_everysense.rb', line 135

def transform_device_data(device_data)
  if device_data[0]["sensor_name"].nil?
    # if first input data does not include sensor_name,
    # output_names are used in the specified order.
    return device_data.map.with_index do |sensor_data, i|
      if !get_output_name_by_index(i).nil?
        transform_sensor_data(sensor_data, get_output_name_by_index(i))
      end
    end.compact
  end
  device_data.map do |sensor_data|
    #$log.debug sensor_data["sensor_name"]
    if @sensor_hash.keys.include?(sensor_data["sensor_name"])
      transform_sensor_data(sensor_data, get_output_name_by_name(sensor_data["sensor_name"]))
    end
  end.compact
end

#transform_sensor_data(sensor_data, output_name) ⇒ Object

output message format of EverySense is as follows

[

{
  "data": {
    "at":"2016-04-14 17:15:00 +0900",
    "unit":"degree Celsius",
    "value":23
  },
  "sensor_name":"FESTIVAL_Test1_Sensor"
},
{
  "data": {
    "at":"2016-04-14 17:15:00 +0900",
    "unit":"%RH",
    "value":30
  },
  "sensor_name":"FESTIVAL_Test1_Sensor2"
}

]



116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/plugin/out_everysense.rb', line 116

def transform_sensor_data(sensor_data, output_name) # modify sensor_name
  {
    data: {
      at: Time.parse(sensor_data["data"]["at"]),
      unit: sensor_data["data"]["unit"],
      value: force_type(sensor_data["data"]["value"])
    },
    sensor_name: output_name
  }
end

#write(chunk) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/fluent/plugin/out_everysense.rb', line 183

def write(chunk)
  case @aggr_type
  when "none"
    chunk.msgpack_each do |tag, time, record|
      #$log.debug transform_device_data(record["json"]).to_json
      put_message(transform_device_data(record["json"]).to_json)
    end
  when "avg", "max", "min"
    method(@aggr_type).call(chunk)
  else
    raise NotImplementedError, "specified aggr_type is not implemented."
  end
end