Class: Fluent::Plugin::CalyptiaMonitoringInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_calyptia_monitoring.rb

Defined Under Namespace

Classes: CreateAgentError, UpdateAgentError

Constant Summary collapse

RPC_CONFIG_DUMP_ENDPOINT =
"/api/config.getDump".freeze
DEFAULT_STORAGE_TYPE =
'local'
DEFAULT_PENDING_METRICS_SIZE =
100
UNPROCESSABLE_HTTP_ERRORS =
[
  422, # Invalid Metrics
  410, # Agent Gone
  401, # Unauthorized
  400, # BadRequest
]

Instance Method Summary collapse

Constructor Details

#initializeCalyptiaMonitoringInput

Returns a new instance of CalyptiaMonitoringInput.



64
65
66
67
68
69
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 64

def initialize
  super
  @current_config = nil
  @monitor = Monitor.new
  @pending = []
end

Instance Method Details

#add_metrics(buffer) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 229

def add_metrics(buffer)
  return false unless agent = @storage_agent.get(:agent)

  begin
    code, response = if @pending.empty?
                       @api_requester.add_metrics(buffer, agent["token"], agent["id"])
                     else
                       @monitor.synchronize do
                         @pending = @pending.concat([buffer])
                         @api_requester.add_metrics(@pending.join, agent["token"], agent["id"])
                         @pending = []
                       end
                     end
    if response && response["error"]
      case code.to_i
      when *UNPROCESSABLE_HTTP_ERRORS
        log.warn "Sending metrics is failed and dropped metrics due to unprocessable on server. Error: `#{response["error"]}', Code: #{code}"
        return false
      end
      log.warn "Failed to send metrics. Error: `#{response["error"]}', Code: #{code}"
      append_pendings(buffer)
      return false
    end
  rescue => ex
    log.warn "Failed to send metrics: error = #{ex}, backtrace = #{ex.backtrace}"
    append_pendings(buffer)
    return false
  end
  return true
end

#append_pendings(buffer) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 214

def append_pendings(buffer)
  @monitor.synchronize do
    if @pending.empty?
      @pending = [buffer]
    elsif @pending.size >= DEFAULT_PENDING_METRICS_SIZE
      drop_count = 1
      @pending = @pending.drop(drop_count)
      log.warn "pending buffer is full. Dropped the first element from the pending buffer"
      @pending.concat([buffer])
    else
      @pending.concat([buffer])
    end
  end
end

#check_config_sending_usabilityObject



187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 187

def check_config_sending_usability
  unless system_config.rpc_endpoint
    log.warn "This feature needs to enable RPC with `rpc_endpoint` on <system>."
    return false
  end

  res = retrieve_config_from_rpc
  if status = (res.code.to_i == 200)
    return status
  else
    log.warn "This feature needs to enable getDump RPC endpoint with `enable_get_dump` on <system>."
    return false
  end
end

#configure(conf) ⇒ Object



71
72
73
74
75
76
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 71

def configure(conf)
  super

  config = conf.elements.select{|e| e.name == 'storage' }.first
  @storage_agent = storage_create(usage: 'calyptia_monitoring_agent', conf: config, default_type: DEFAULT_STORAGE_TYPE)
end

#create_agent(current_config) ⇒ Object



159
160
161
162
163
164
165
166
167
168
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 159

def create_agent(current_config)
  code, agent, machine_id = @api_requester.create_agent(current_config)
  if code.to_s.start_with?("2")
    @storage_agent.put(:agent, agent)
    @storage_agent.put(:machine_id, machine_id)
    return true
  else
    raise CreateAgentError, "Create agent is failed. Error: `#{agent["error"]}', code: #{code}"
  end
end

#get_current_config_from_rpcObject



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 78

def get_current_config_from_rpc
  res = retrieve_config_from_rpc
  config = Yajl.load(res.body)["conf"]
  conf = Fluent::Config.parse(config, '(supervisor)', '(RPC)', true)
  confs = []
  conf.elements.select{|e| e.name == 'ROOT' }.first.elements.each{|e|
    confs << e.to_s
  }
  # Remove outer <ROOT> element
  confs.join
end

#get_masked_conf_from_conf_fileObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 90

def get_masked_conf_from_conf_file
  return "" unless File.exist?(@cloud_monitoring.fluentd_conf_path) # check file existence.

  conf = ""
  callback = ->(status) {
    if status && status.success?
      #nop
    elsif status
      log.warn "config dumper exits with error code", prog: prog, status: status.exitstatus, signal: status.termsig
    else
      log.warn "config dumper unexpectedly exits without exit status", prog: prog
    end
  }
  spawn_command, arguments = if Fluent.windows?
                    [::ServerEngine.ruby_bin_path, File.join(File.dirname(__FILE__), "calyptia_config_dumper.rb")]
                  else
                    [File.join(File.dirname(__FILE__), "calyptia_config_dumper.rb")]
                  end

  retval = child_process_execute(:exec_calyptia_config_dumper, spawn_command, arguments: arguments, immediate: true,
                                 env: {"FLUENT_CONFIG_PATH" => @cloud_monitoring.fluentd_conf_path}, parallel: true, mode: [:read_with_stderr],
                                 on_exit_callback: callback) do |io|
    io.set_encoding(Encoding::ASCII_8BIT)
    conf = io.read
  end
  unless retval.nil?
    begin
      Timeout.timeout(10) do
        sleep 0.1 until !conf.empty?
      end
    rescue Timeout::Error
      log.warn "cannot retrive configuration contents on #{@cloud_monitoring.fluentd_conf_path} within 10 seconds."
    end
  end
  conf
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 60

def multi_workers_ready?
  true
end

#on_timer_send_metricsObject



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 260

def on_timer_send_metrics
  opts = {with_config: false, with_retry: false}
  buffer = ""
  @monitor_agent.plugins_info_all(opts).each { |record|
    metrics = record["metrics"]
    metrics.each_pair do |k, v|
      buffer += v
    end
  }
  @monitor_agent_buffer.plugins_info_all(opts).each {|record|
    metrics = record["metrics"]
    metrics.each_pair do |k, v|
      buffer += v
    end
  }
  if buffer.empty?
    log.debug "No initialized metrics is found. Trying to send cmetrics on the next tick."
  else
    unless add_metrics(buffer)
      log.warn "Sending metrics is failed. Trying to send pending buffers in the next interval: #{@cloud_monitoring.rate}, next sending time: #{Time.now + @cloud_monitoring.rate}"
    end
  end
end

#retrieve_config_from_rpcObject



202
203
204
205
206
207
208
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 202

def retrieve_config_from_rpc
  uri = URI.parse("http://#{system_config.rpc_endpoint}")
  res = Net::HTTP.start(uri.host, uri.port) {|http|
    http.get(RPC_CONFIG_DUMP_ENDPOINT)
  }
  res
end

#setup_agent(current_config) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 170

def setup_agent(current_config)
  if agent = @storage_agent.get(:agent)
    unless machine_id = @storage_agent.get(:machine_id)
      return create_agent(current_config)
    end
    code, body = @api_requester.update_agent(current_config, agent, machine_id)
    if code.to_s.start_with?("2")
      return true
    else
      log.warn "Updating agent is failed. Error: #{Yajl.load(body)["error"]}, Code: #{code}"
      return false
    end
  else
    create_agent(current_config)
  end
end

#shutdownObject



210
211
212
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 210

def shutdown
  super
end

#startObject

Raises:

  • (Fluent::ConfigError)


127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/fluent/plugin/in_calyptia_monitoring.rb', line 127

def start
  super

  enabled_cmetrics = if system_config.metrics
                       system_config.metrics[:@type] == "cmetrics"
                     else
                       false
                     end
  raise Fluent::ConfigError, "cmetrics plugin should be used to collect metrics on Calyptia Cloud" unless enabled_cmetrics
  @monitor_agent = Fluent::Plugin::CalyptiaMonitoringExtInput.new
  @monitor_agent_buffer = Fluent::Plugin::CalyptiaMonitoringBufferExtInput.new
  @api_requester = Fluent::Plugin::CalyptiaAPI::Requester.new(@cloud_monitoring.endpoint,
                                                              @cloud_monitoring.api_key,
                                                              log,
                                                              fluentd_worker_id)
  @current_config = if !@cloud_monitoring.fluentd_conf_path.nil?
                      get_masked_conf_from_conf_file
                    elsif check_config_sending_usability
                      get_current_config_from_rpc
                    end

  if @cloud_monitoring.rate < 30
    log.warn "High frequency events ingestion is not supported. Set up 30s as ingestion interval"
    @cloud_monitoring[:rate] = 30
  end
  if setup_agent(@current_config)
    timer_execute(:in_calyptia_monitoring_send_metrics, @cloud_monitoring.rate, &method(:on_timer_send_metrics))
  else
    raise UpdateAgentError, "Setup agent is failed. Something went wrong"
  end
end