Class: Fluent::EverySenseInput
- Inherits:
-
Input
- Object
- Input
- Fluent::EverySenseInput
show all
- Includes:
- EverySenseProxy
- Defined in:
- lib/fluent/plugin/in_everysense.rb
Defined Under Namespace
Classes: EverySenseParser
Instance Method Summary
collapse
#create_session, #create_session_request, #delete_session, #delete_session_request, #error_handler, #get_messages, #get_messages_params, #get_messages_request, included, #put_message, #put_message_request, #shutdown_proxy, #start_proxy, #target_path, #valid_session?
Instance Method Details
82
83
84
85
|
# File 'lib/fluent/plugin/in_everysense.rb', line 82
def configure(conf)
super
configure_parser(conf)
end
|
87
88
89
90
91
|
# File 'lib/fluent/plugin/in_everysense.rb', line 87
def configure_parser(conf)
@parser = Plugin.new_parser(@format)
@parser.configure(conf)
@everysense_parser = EverySenseParser.new(@format, @parser)
end
|
#emit(messages) ⇒ Object
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/fluent/plugin/in_everysense.rb', line 124
def emit(messages)
begin
parse(messages).each do |msg|
router.emit(@tag, msg[:time], {json: msg[:record]})
end
rescue Exception => e
$log.error :error => e.to_s
$log.debug_backtrace(e.backtrace)
end
end
|
#parse(messages) ⇒ Object
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/fluent/plugin/in_everysense.rb', line 113
def parse(messages)
@everysense_parser.parse(messages) do |time, record|
if time.nil?
$log.debug "Since time_key field is nil, Fluent::Engine.now is used."
time = Fluent::Engine.now
end
{time: time, record: record}
end
end
|
#shutdown ⇒ Object
135
136
137
138
139
|
# File 'lib/fluent/plugin/in_everysense.rb', line 135
def shutdown
@proxy_thread.kill
shutdown_proxy
super
end
|
#start ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/fluent/plugin/in_everysense.rb', line 93
def start
super
start_proxy
@proxy_thread = Thread.new do
while (true)
begin
messages = get_messages
emit(messages) if !(messages.nil? || messages.empty?)
sleep @polling_interval
rescue Exception => e
$log.error :error => e.to_s
$log.debug(e.backtrace.join("\n"))
sleep @polling_interval
end
end
end
end
|