Module: Carnivore
- Defined in:
- lib/carnivore.rb,
lib/carnivore/utils.rb,
lib/carnivore/errors.rb,
lib/carnivore/logger.rb,
lib/carnivore/runner.rb,
lib/carnivore/source.rb,
lib/carnivore/message.rb,
lib/carnivore/version.rb,
lib/carnivore/callback.rb,
lib/carnivore/container.rb,
lib/carnivore/supervisor.rb,
lib/carnivore/source/test.rb,
lib/carnivore/spec_helper.rb,
lib/carnivore/utils/params.rb,
lib/carnivore/utils/failure.rb,
lib/carnivore/utils/logging.rb,
lib/carnivore/source_container.rb,
lib/carnivore/utils/message_registry.rb
Overview
Message consumer and processor
Defined Under Namespace
Modules: Utils Classes: Callback, Container, Error, Message, Source, Supervisor
Constant Summary collapse
- Logger =
Zoidberg.logger
- FORCE_SHUTDOWN_AFTER =
Number of seconds to wait for clean shutdown
8
- VERSION =
Current version of library
Gem::Version.new('1.0.6')
Class Method Summary collapse
-
.configure { ... } ⇒ self
Add configuration to Carnivore.
-
.configure!(*args) ⇒ Bogo::Config
Sets the global configuration.
-
.start! ⇒ Object
Start the Carnivore subsystem.
- .uuid ⇒ Object
Class Method Details
.configure { ... } ⇒ self
Add configuration to Carnivore
37 38 39 40 41 |
# File 'lib/carnivore/runner.rb', line 37 def configure(&block) mod = Container.new mod.instance_exec(mod, &block) self end |
.configure!(*args) ⇒ Bogo::Config
Sets the global configuration
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/carnivore/runner.rb', line 14 def configure!(*args) if(defined?(Carnivore::Config)) if(!args.include?(:verify) && !args.include?(:force)) raise 'Global configuration has already been set!' end if(args.include?(:force)) Carnivore.send(:remove_const, :Config) end end unless(defined?(Carnivore::Config)) Carnivore.const_set(:Config, Bogo::Config.new( args.first.is_a?(Symbol) ? Smash.new : args.first ) ) end Carnivore::Config end |
.start! ⇒ Object
Start the Carnivore subsystem
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/carnivore/runner.rb', line 44 def start! supervisor = nil begin require 'carnivore/supervisor' configure!(:verify) supervisor = Carnivore::Supervisor.build! Carnivore::Logger.info 'Initializing all registered sources.' [].tap do |register| Source.sources.each do |source| register << Thread.new do supervisor.supervise_as( source.source_hash[:name], source.klass, source.source_hash.dup ) end end end.map(&:join) Carnivore::Logger.info 'Source initializations complete. Running setup and establishing connections.' Source.sources.each do |source| supervisor[source.source_hash[:name]].run_setup supervisor[source.source_hash[:name]].run_connect end Carnivore::Logger.info 'Sources setup and connected. Enabling message processing.' Source.sources.each do |source| if(source.source_hash.fetch(:auto_process, true)) supervisor[source.source_hash[:name]].start! end end loop do # We do a sleep loop so we can periodically check on the # supervisor and ensure it is still alive. If it has died, # raise exception to allow cleanup and restart attempt gc_interval = Carnivore::Config.fetch(:carnivore, :garbage_interval, 30) gc_last = Time.now.to_i while(supervisor.alive?) sleep Carnivore::Config.get(:carnivore, :supervisor, :poll) || 5 if(gc_interval && (Time.now.to_i - gc_last) > gc_interval) Carnivore::Logger.debug 'Starting interval forced garbage collection from runner' GC.start gc_last = Time.now.to_i end end Carnivore::Logger.error 'Carnivore supervisor has died!' raise Carnivore::Error::DeadSupervisor.new end rescue Carnivore::Error::DeadSupervisor Carnivore::Logger.warn "Received dead supervisor exception. Attempting to restart." begin supervisor.terminate if supervisor.alive? rescue => e Carnivore::Logger.debug "Exception raised during supervisor termination (restart cleanup): #{e}" end Carnivore::Logger.debug "Pausing restart for 10 seconds to prevent restart thrashing cycles" sleep 10 retry rescue Exception => e Carnivore::Logger.warn "Exception type encountered forcing shutdown - #{e.class}: #{e}" Carnivore::Logger.debug "Shutdown exception info: #{e.class}: #{e}\n#{e.backtrace.join("\n")}" Zoidberg.signal_shutdown = true supervisor.terminate if supervisor Carnivore::Logger.debug 'Carnivore supervisor has been teminated!' timeout = Carnivore::FORCE_SHUTDOWN_AFTER until(timeout <= 0 || Thread.list.size == 1) timeout -= 0.1 sleep(0.1) end Thread.list.each do |thread| next if Thread.current == thread Carnivore::Logger.warn "Force killing live thread for shutdown: #{thread.inspect}" thread.kill end raise end end |
.uuid ⇒ Object
22 23 24 |
# File 'lib/carnivore.rb', line 22 def self.uuid Zoidberg.uuid end |