Module: Emque::Consuming::Application

Defined in:
lib/emque/consuming/application.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(descendant) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/emque/consuming/application.rb', line 16

def self.included(descendant)
  Emque::Consuming.application = descendant

  descendant.class_eval do
    extend Emque::Consuming::Core
    include Emque::Consuming::Helpers

    attr_reader :error_tracker, :manager

    private :ensure_adapter_is_configured!, :initialize_error_tracker,
            :initialize_manager, :log_prefix, :handle_shutdown
  end
end

Instance Method Details

#auto_shutdown_enabledObject

private



74
75
76
# File 'lib/emque/consuming/application.rb', line 74

def auto_shutdown_enabled
  config.auto_shutdown
end

#ensure_adapter_is_configured!Object



78
79
80
81
82
83
# File 'lib/emque/consuming/application.rb', line 78

def ensure_adapter_is_configured!
  if config.adapter.nil?
    raise AdapterConfigurationError,
          "Adapter not found! use config.set_adapter(name, options)"
  end
end

#handle_shutdownObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/emque/consuming/application.rb', line 85

def handle_shutdown
  context = {
    :limit => error_tracker.limit,
    :expiration => error_tracker.expiration,
    :occurrences => error_tracker.occurrences,
    :status => runner.status.to_h,
    :configuration => config.to_h
  }

  Emque::Consuming.logger.error("Error limit exceeded... shutting down")
  Emque::Consuming.logger.error(context)

  Emque::Consuming.config.shutdown_handlers.each do |handler|
    handler.call(context)
  end
end

#initializeObject



30
31
32
33
34
35
36
37
38
39
# File 'lib/emque/consuming/application.rb', line 30

def initialize
  self.class.instance = self

  logger.info "#{log_prefix}: initializing"

  ensure_adapter_is_configured!

  initialize_manager
  initialize_error_tracker
end

#initialize_error_trackerObject



102
103
104
105
106
107
# File 'lib/emque/consuming/application.rb', line 102

def initialize_error_tracker
  @error_tracker = Emque::Consuming::ErrorTracker.new(
    :expiration => config.error_expiration,
    :limit => config.error_limit
  )
end

#initialize_managerObject



109
110
111
# File 'lib/emque/consuming/application.rb', line 109

def initialize_manager
  @manager = config.adapter.manager.new
end

#log_prefixObject



113
114
115
# File 'lib/emque/consuming/application.rb', line 113

def log_prefix
  "#{config.app_name.capitalize} Application"
end

#notice_error(context) ⇒ Object



41
42
43
44
# File 'lib/emque/consuming/application.rb', line 41

def notice_error(context)
  error_tracker.notice_error_for(context)
  verify_error_status
end

#restartObject



46
47
48
49
50
51
# File 'lib/emque/consuming/application.rb', line 46

def restart
  stop
  initialize_manager
  error_tracker.occurrences.clear
  start
end

#startObject



53
54
55
56
# File 'lib/emque/consuming/application.rb', line 53

def start
  logger.info "#{log_prefix}: starting"
  manager.async.start
end

#stopObject



58
59
60
61
# File 'lib/emque/consuming/application.rb', line 58

def stop
  logger.info "#{log_prefix}: stopping"
  manager.stop
end

#verify_error_statusObject



63
64
65
66
67
68
69
70
# File 'lib/emque/consuming/application.rb', line 63

def verify_error_status
  if error_tracker.limit_reached?
    if auto_shutdown_enabled
      handle_shutdown
      runner.stop
    end
  end
end