Class: Fluent::EverySenseInput
- Inherits:
-
Input
- Object
- Input
- Fluent::EverySenseInput
show all
- Includes:
- EverySenseProxy
- Defined in:
- lib/fluent/plugin/in_everysense.rb
Defined Under Namespace
Classes: EverySenseParser
Instance Method Summary
collapse
#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, included, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?
Instance Method Details
#add_recv_time(record) ⇒ Object
84
85
86
87
88
89
90
91
|
# File 'lib/fluent/plugin/in_everysense.rb', line 84
def add_recv_time(record)
if @recv_time
record.merge({@recv_time_key => Time.now.instance_eval { self.to_i * 1000 + (usec/1000) }})
else
record
end
end
|
52
53
54
55
|
# File 'lib/fluent/plugin/in_everysense.rb', line 52
def configure(conf)
super
configure_parser(conf)
end
|
57
58
59
60
61
|
# File 'lib/fluent/plugin/in_everysense.rb', line 57
def configure_parser(conf)
@parser = Plugin.new_parser(@format)
@parser.configure(conf)
@everysense_parser = EverySenseParser.new(@format, @parser)
end
|
#emit(messages) ⇒ Object
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/fluent/plugin/in_everysense.rb', line 104
def emit(messages)
begin
parse(messages).each do |msg|
router.emit(@tag, msg[:time], msg[:record])
end
rescue Exception => e
$log.error :error => e.to_s
$log.debug_backtrace(e.backtrace)
end
end
|
#parse(messages) ⇒ Object
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/fluent/plugin/in_everysense.rb', line 93
def parse(messages)
@everysense_parser.parse(messages) do |time, record|
if time.nil?
$log.debug "Since time_key field is nil, Fluent::Engine.now is used."
time = Fluent::Engine.now
end
$log.debug "#{time}, #{add_recv_time(record)}"
{time: time, record: add_recv_time(record)}
end
end
|
#shutdown ⇒ Object
115
116
117
118
119
|
# File 'lib/fluent/plugin/in_everysense.rb', line 115
def shutdown
@proxy_thread.kill
shutdown_proxy
super
end
|
#start ⇒ Object
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/fluent/plugin/in_everysense.rb', line 63
def start
raise StandardError.new if @tag.nil?
super
start_proxy
@proxy_thread = Thread.new do
while (true)
begin
messages = get_messages
$log.debug "get_messages: #{messages}"
emit(messages) if !(messages.nil? || messages.empty?)
sleep @polling_interval
rescue Exception => e
$log.error :error => e.to_s
$log.debug(e.backtrace.join("\n"))
sleep @polling_interval
end
end
end
end
|