Class: Fluent::EverySenseInput

Inherits:
Input
  • Object
show all
Includes:
EverySenseProxy
Defined in:
lib/fluent/plugin/in_everysense.rb

Defined Under Namespace

Classes: EverySenseParser

Instance Method Summary collapse

Methods included from EverySenseProxy

#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, included, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?

Instance Method Details

#configure(conf) ⇒ Object



82
83
84
85
# File 'lib/fluent/plugin/in_everysense.rb', line 82

def configure(conf)
  super
  configure_parser(conf)
end

#configure_parser(conf) ⇒ Object



87
88
89
90
91
# File 'lib/fluent/plugin/in_everysense.rb', line 87

def configure_parser(conf)
  @parser = Plugin.new_parser(@format)
  @parser.configure(conf)
  @everysense_parser = EverySenseParser.new(@format, @parser)
end

#emit(messages) ⇒ Object



124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_everysense.rb', line 124

def emit(messages)
  begin
    parse(messages).each do |msg|
      router.emit(@tag, msg[:time], {json: msg[:record]})
    end
  rescue Exception => e
    $log.error :error => e.to_s
    $log.debug_backtrace(e.backtrace)
  end
end

#parse(messages) ⇒ Object



113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/in_everysense.rb', line 113

def parse(messages)
  @everysense_parser.parse(messages) do |time, record|
    if time.nil?
      $log.debug "Since time_key field is nil, Fluent::Engine.now is used."
      time = Fluent::Engine.now
    end
    #$log.debug "#{time}, #{record}"
    {time: time, record: record}
  end
end

#shutdownObject



135
136
137
138
139
# File 'lib/fluent/plugin/in_everysense.rb', line 135

def shutdown
  @proxy_thread.kill
  shutdown_proxy
  super
end

#startObject



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/in_everysense.rb', line 93

def start
  #raise StandardError.new if @tag.nil?
  super
  start_proxy
  @proxy_thread = Thread.new do
    while (true)
      begin
        messages = get_messages
        emit(messages) if !(messages.nil? || messages.empty?)
        sleep @polling_interval
      rescue Exception => e
        $log.error :error => e.to_s
        $log.debug(e.backtrace.join("\n"))
        #$log.debug_backtrace(e.backtrace)
        sleep @polling_interval
      end
    end
  end
end