Class: Rubix::Sender
Instance Attribute Summary collapse
-
#applications ⇒ Object
The applications used to create items.
-
#host ⇒ Object
The host the Sender will send data for.
-
#host_groups ⇒ Object
The hostgroups used to create this host.
-
#settings ⇒ Object
A Hash of options.
-
#templates ⇒ Object
The templates used to create this host.
Instance Method Summary collapse
- #alive? ⇒ Boolean
- #confirm_settings ⇒ Object
-
#initialize(settings) ⇒ Sender
constructor
Initialization.
- #initialize_applications ⇒ Object
- #initialize_host ⇒ Object
- #initialize_hostgroups ⇒ Object
- #initialize_templates ⇒ Object
-
#looks_like_json?(line) ⇒ Boolean
Does the line look like it might be JSON?.
-
#process_file(path) ⇒ Object
Process each line of the file at
path
. -
#process_file_handle(f) ⇒ Object
Process each line of a given file handle
f
. - #process_line(line) ⇒ Object
-
#process_line_of_json_in_new_pipe(line) ⇒ Object
Parse and send a single
line
of JSON input to the Zabbix server. -
#process_line_of_tsv_in_this_pipe(line) ⇒ Object
Parse and send a single
line
of TSV input to the Zabbix server. -
#process_pipe ⇒ Object
Process each line read from the pipe.
-
#process_stdin ⇒ Object
Process each line of standard input.
-
#process_zabbix_sender_output(key, text) ⇒ Object
Parse the
text
output byzabbix_sender
. -
#run ⇒ Object
Actions.
-
#send(key, value, timestamp) ⇒ Object
Send the
value
forkey
at the giventimestamp
to the Zabbix server.
Methods included from Logs
#debug, #error, #fatal, #info, #log_name, #warn
Constructor Details
#initialize(settings) ⇒ Sender
Initialization
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/rubix/sender.rb', line 28 def initialize settings @settings = settings confirm_settings self.host = Host.new(:name => settings['host']) @log_name = "PIPE #{host.name}" if settings['fast'] info("Forwarding...") if settings['verbose'] else initialize_hostgroups initialize_templates initialize_host initialize_applications info("Forwarding...") if settings['verbose'] && host.exists? end end |
Instance Attribute Details
#applications ⇒ Object
The applications used to create items.
22 23 24 |
# File 'lib/rubix/sender.rb', line 22 def applications @applications end |
#host ⇒ Object
The host the Sender will send data for.
13 14 15 |
# File 'lib/rubix/sender.rb', line 13 def host @host end |
#host_groups ⇒ Object
The hostgroups used to create this host.
16 17 18 |
# File 'lib/rubix/sender.rb', line 16 def host_groups @host_groups end |
#settings ⇒ Object
A Hash of options.
10 11 12 |
# File 'lib/rubix/sender.rb', line 10 def settings @settings end |
#templates ⇒ Object
The templates used to create this host.
19 20 21 |
# File 'lib/rubix/sender.rb', line 19 def templates @templates end |
Instance Method Details
#alive? ⇒ Boolean
44 45 46 |
# File 'lib/rubix/sender.rb', line 44 def alive? settings['fast'] || host.exists? end |
#confirm_settings ⇒ Object
72 73 74 75 76 77 |
# File 'lib/rubix/sender.rb', line 72 def confirm_settings raise ConnectionError.new("Must specify a Zabbix server to send data to.") unless settings['server'] raise Error.new("Must specify the path to a local configuraiton file") unless settings['configuration_file'] && File.file?(settings['configuration_file']) raise ConnectionError.new("Must specify the name of a host to send data for.") unless settings['host'] raise ValidationError.new("Must define at least one host group.") if settings['host_groups'].nil? || settings['host_groups'].empty? end |
#initialize_applications ⇒ Object
68 69 70 |
# File 'lib/rubix/sender.rb', line 68 def initialize_applications self.applications = (settings['applications'] || '').split(',').flatten.compact.map { |app_name| Application.find_or_create_by_name_and_host(app_name, host) } end |
#initialize_host ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/rubix/sender.rb', line 56 def initialize_host unless host.exists? host.host_groups = host_groups host.templates = templates host.create end # if settings['verbose'] # puts "Forwarding data for Host '#{settings['host']}' (#{host_id}) from #{settings['pipe']} to #{settings['server']}" # puts "Creating Items in Application '#{settings['application']}' (#{application_id}) at #{settings['api_server']} as #{settings['username']}" # end end |
#initialize_hostgroups ⇒ Object
48 49 50 |
# File 'lib/rubix/sender.rb', line 48 def initialize_hostgroups self.host_groups = settings['host_groups'].split(',').flatten.compact.map { |group_name | HostGroup.find_or_create_by_name(group_name.strip) } end |
#initialize_templates ⇒ Object
52 53 54 |
# File 'lib/rubix/sender.rb', line 52 def initialize_templates self.templates = (settings['templates'] || '').split(',').flatten.compact.map { |template_name | Template.find_or_create_by_name(template_name.strip) } end |
#looks_like_json?(line) ⇒ Boolean
Does the line look like it might be JSON?
251 252 253 |
# File 'lib/rubix/sender.rb', line 251 def looks_like_json? line line =~ /^\s*\{/ end |
#process_file(path) ⇒ Object
Process each line of the file at path
.
99 100 101 102 103 |
# File 'lib/rubix/sender.rb', line 99 def process_file path f = File.new(path) process_file_handle(f) f.close end |
#process_file_handle(f) ⇒ Object
Process each line of a given file handle f
.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/rubix/sender.rb', line 127 def process_file_handle f begin line = f.readline rescue EOFError line = nil end while line process_line(line) begin # FIXME -- this call to File#readline blocks and doesn't let # stuff like SIGINT (generated from Ctrl-C on a keyboard, # say) take affect. line = f.readline rescue EOFError line = nil end end end |
#process_line(line) ⇒ Object
146 147 148 149 150 151 152 |
# File 'lib/rubix/sender.rb', line 146 def process_line line if looks_like_json?(line) process_line_of_json_in_new_pipe(line) else process_line_of_tsv_in_this_pipe(line) end end |
#process_line_of_json_in_new_pipe(line) ⇒ Object
Parse and send a single line
of JSON input to the Zabbix server. The JSON must have a key data
in order to be processed. The value of ‘data’ should be an Array of Hashes each with a key
and value
.
This ZabbixPipe’s settings will be merged with the remainder of the JSON hash. This allows sending values for ‘host2’ to an instance of ZabbixPipe already set up to receive for ‘host1’.
This is useful for sending data for keys from multiple hosts
Example of expected input:
{
'data': [
{'key': 'foo.bar.baz', 'value': 10},
{'key': 'snap.crackle.pop', 'value': 8 }
]
}
Or when sending for another host:
{
'hostname': 'shazaam',
'application': 'silly',
'data': [
{'key': 'foo.bar.baz', 'value': 10},
{'key': 'snap.crackle.pop', 'value': 8 }
]
}
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/rubix/sender.rb', line 207 def process_line_of_json_in_new_pipe line begin json = JSON.parse(line) rescue JSON::ParserError => e error("Malformed JSON") return end data = json.delete('data') unless data && data.is_a?(Array) error("A line of JSON input must a have an Array key 'data'") return end if json.empty? # If there are no other settings then the daughter will be the # same as the parent -- so just use 'self'. daughter_pipe = self else # We merge the settings from 'self' with whatever else is # present in the line. begin daughter_pipe = self.class.new(settings.stringify_keys.merge(json)) return unless daughter_pipe.alive? rescue Error => e error(e.) return end end data.each do |point| key = point['key'] value = point['value'] unless key && value warn("The elements of the 'data' Array must be Hashes with a 'key' and a 'value'") next end tsv_line = [key, value].map(&:to_s).join("\t") daughter_pipe.process_line(tsv_line) end end |
#process_line_of_tsv_in_this_pipe(line) ⇒ Object
Parse and send a single line
of TSV input to the Zabbix server. The line will be split at tabs and expects either
a) two columns: an item key and a value
b) three columns: an item key, a value, and a timestamp
Unexpected input will cause an error to be logged.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/rubix/sender.rb', line 161 def process_line_of_tsv_in_this_pipe line parts = line.strip.split("\t") case parts.size when 2 = Time.now key, value = parts when 3 key, value = parts[0..1] = Time.parse(parts.last) else error("Each line of input must be a tab separated row consisting of 2 columns (key, value) or 3 columns (timestamp, key, value)") return end send(key, value, ) end |
#process_pipe ⇒ Object
Process each line read from the pipe.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rubix/sender.rb', line 111 def process_pipe # We want to open this pipe in non-blocking read mode b/c # otherwise this process becomes hard to kill. f = File.new(settings['pipe'], (File::RDONLY | File::NONBLOCK)) while true process_file_handle(f) # In non-blocking mode, an EOFError from f.readline doesn't mean # there's no more data to read, just that there's no more data # right *now*. If we sleep for a bit there might be more data # coming down the pipe. sleep settings['pipe_read_sleep'] end f.close end |
#process_stdin ⇒ Object
Process each line of standard input.
106 107 108 |
# File 'lib/rubix/sender.rb', line 106 def process_stdin process_file_handle($stdin) end |
#process_zabbix_sender_output(key, text) ⇒ Object
Parse the text
output by zabbix_sender
.
290 291 292 293 294 295 296 297 298 |
# File 'lib/rubix/sender.rb', line 290 def process_zabbix_sender_output key, text return unless settings['verbose'] lines = text.strip.split("\n") return if lines.size < 1 status_line = lines.first status_line =~ /Processed +(\d+) +Failed +(\d+) +Total +(\d+)/ processed, failed, total = $1, $2, $3 warn("Failed to write #{failed} values to key '#{key}'") if failed.to_i != 0 end |
#run ⇒ Object
Actions
83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/rubix/sender.rb', line 83 def run return unless alive? case when settings['pipe'] process_pipe when settings.rest.size > 0 settings.rest.each do |path| process_file(path) end else process_stdin end exit(0) end |
#send(key, value, timestamp) ⇒ Object
Send the value
for key
at the given timestamp
to the Zabbix server.
If the key
doesn’t exist for this local agent’s host, it will be added.
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/rubix/sender.rb', line 260 def send key, value, item = Item.new(:key => key, :host => host, :applications => applications, :value_type => Item.value_type_from_value(value)) unless settings['fast'] || item.exists? return unless item.create # There is a time lag of about 15-30 seconds between (successfully) # creating an item on the Zabbix server and having the Zabbix accept # new data for that item. # # If it is crucial that *every single* data point be written, dial # up this sleep period. The first data point for a new key will put # the wrapper to sleep for this period of time, in hopes that the # Zabbix server will catch up and be ready to accept new data # points. # # If you don't care that you're going to lose the first few data # points you send to Zabbix, then don't worry about it. sleep settings['create_item_sleep'] end command = "#{settings['sender']} --config #{settings['configuration_file']} --zabbix-server #{settings['server']} --host #{settings['host']} --key #{key} --value '#{value}'" process_zabbix_sender_output(key, `#{command}`) # command = "zabbix_sender --config #{configuration_file} --zabbix-server #{server} --input-file - --with-timestamps" # open(command, 'w') do |zabbix_sender| # zabbix_sender.write([settings['host'], key, timestamp.to_i, value].map(&:to_s).join("\t")) # zabbix_sender.close_write # process_zabbix_sender_output(zabbix_sender.read) # end end |