Class: Katello::EventMonitor::PollerThread

Inherits:
Object
  • Object
show all
Defined in:
app/services/katello/event_monitor/poller_thread.rb

Overview

TODO: Move this class to app/lib/katello/event_daemon/services with other service definitions

Constant Summary collapse

SLEEP_INTERVAL =
3

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger = nil) ⇒ PollerThread

Returns a new instance of PollerThread.



30
31
32
33
34
# File 'app/services/katello/event_monitor/poller_thread.rb', line 30

def initialize(logger = nil)
  @logger = logger || ::Foreman::Logging.logger('katello/katello_events')
  @failed_count = 0
  @processed_count = 0
end

Class Method Details

.closeObject



13
14
15
16
17
18
# File 'app/services/katello/event_monitor/poller_thread.rb', line 13

def self.close
  if self.instance
    self.instance.close
    self.instance = nil
  end
end

.initialize(logger = nil) ⇒ Object



9
10
11
# File 'app/services/katello/event_monitor/poller_thread.rb', line 9

def self.initialize(logger = nil)
  self.instance ||= self.new(logger)
end

.runObject



20
21
22
23
24
# File 'app/services/katello/event_monitor/poller_thread.rb', line 20

def self.run
  initialize
  ::Katello::EventQueue.reset_in_progress
  instance.poll_for_events
end

.statusObject



26
27
28
# File 'app/services/katello/event_monitor/poller_thread.rb', line 26

def self.status
  instance&.status
end

Instance Method Details

#closeObject



36
37
38
39
# File 'app/services/katello/event_monitor/poller_thread.rb', line 36

def close
  @logger.info("Stopping Katello Event Monitor")
  @thread&.kill
end

#poll_for_eventsObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'app/services/katello/event_monitor/poller_thread.rb', line 85

def poll_for_events
  @thread = Thread.new do
    @logger.info("Polling Katello Event Queue")
    loop do
      Rails.application.executor.wrap do
        Katello::Util::Support.with_db_connection(@logger) do
          until (event = ::Katello::EventQueue.next_event).nil?
            run_event(event)
            @processed_count += 1
          end
        end
      end

      sleep SLEEP_INTERVAL
    end
  rescue => e
    @logger.error(e.message)
    @logger.error("Fatal error in Katello Event Monitor")
    self.class.close
  end
end

#run_event(event) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'app/services/katello/event_monitor/poller_thread.rb', line 53

def run_event(event)
  Katello::Logging.time("katello event handled") do |data|
    data[:type] = event.event_type
    data[:object_id] = event.object_id
    data[:expired] = false
    data[:rescheduled] = false

    event_instance = nil
    begin
      ::User.as_anonymous_admin do
        event_instance = ::Katello::EventQueue.create_instance(event)
        event_instance.run
      end
    rescue => e
      @failed_count += 1
      @logger.error("event_queue_error: type=#{event.event_type}, object_id=#{event.object_id}")
      @logger.error(e.message)
      @logger.error(e.backtrace.join("\n"))
    ensure
      if event_instance.try(:retry)
        result = ::Katello::EventQueue.reschedule_event(event)
        if result == :expired
          @logger.warn("event_queue_event_expired: type=#{event.event_type} object_id=#{event.object_id}")
        elsif !result.nil?
          @logger.warn("event_queue_rescheduled: type=#{event.event_type} object_id=#{event.object_id}")
        end
      end
      ::Katello::EventQueue.clear_events(event.event_type, event.object_id, event.created_at)
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'app/services/katello/event_monitor/poller_thread.rb', line 41

def running?
  @thread&.status || false
end

#statusObject



45
46
47
48
49
50
51
# File 'app/services/katello/event_monitor/poller_thread.rb', line 45

def status
  {
    processed_count: @processed_count,
    failed_count: @failed_count,
    running: running?
  }
end