Class: Xcflushd::Flusher

Inherits:
Object
  • Object
show all
Defined in:
lib/xcflushd/flusher.rb

Constant Summary collapse

XcflushdError =
Class.new(StandardError)

Instance Method Summary collapse

Constructor Details

#initialize(reporter, authorizer, storage, auth_ttl, error_handler, logger, threads) ⇒ Flusher

Returns a new instance of Flusher.



11
12
13
14
15
16
17
18
19
# File 'lib/xcflushd/flusher.rb', line 11

def initialize(reporter, authorizer, storage, auth_ttl, error_handler, logger, threads)
  @reporter = reporter
  @authorizer = authorizer
  @storage = storage
  @auth_ttl = auth_ttl
  @error_handler = error_handler
  @logger = logger
  @thread_pool = Concurrent::FixedThreadPool.new(threads)
end

Instance Method Details

#flushObject

TODO: decide if we want to renew the authorizations every time.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/xcflushd/flusher.rb', line 34

def flush
  reports_to_flush = run_and_log_time('Getting the reports from Redis') do
    reports
  end

  run_and_log_time('Reporting to 3scale') { report(reports_to_flush) }

  # Ideally, we would like to ensure that once we start checking
  # authorizations, they have taken into account the reports that we just
  # performed. However, in 3scale, reports are asynchronous and the current
  # API does not provide a way to know whether a report has already been
  # processed.
  # For now, let's just wait a few seconds. This will greatly mitigate the
  # problem.
  run_and_log_time('Giving reports some time to be processed') do
    sleep(WAIT_TIME_REPORT_AUTH)
  end

  auths = run_and_log_time('Getting the auths from 3scale') do
    authorizations(reports_to_flush)
  end

  run_and_log_time('Renewing the auths in Redis') { renew(auths) }
end

#shutdownObject



21
22
23
# File 'lib/xcflushd/flusher.rb', line 21

def shutdown
  @thread_pool.shutdown
end

#terminateObject



29
30
31
# File 'lib/xcflushd/flusher.rb', line 29

def terminate
  @thread_pool.kill
end

#wait_for_termination(secs = nil) ⇒ Object



25
26
27
# File 'lib/xcflushd/flusher.rb', line 25

def wait_for_termination(secs = nil)
  @thread_pool.wait_for_termination(secs)
end