Module: Carnivore

Defined in:
lib/carnivore.rb,
lib/carnivore/utils.rb,
lib/carnivore/errors.rb,
lib/carnivore/logger.rb,
lib/carnivore/runner.rb,
lib/carnivore/source.rb,
lib/carnivore/message.rb,
lib/carnivore/version.rb,
lib/carnivore/callback.rb,
lib/carnivore/container.rb,
lib/carnivore/supervisor.rb,
lib/carnivore/source/test.rb,
lib/carnivore/spec_helper.rb,
lib/carnivore/utils/params.rb,
lib/carnivore/utils/failure.rb,
lib/carnivore/utils/logging.rb,
lib/carnivore/source_container.rb,
lib/carnivore/utils/message_registry.rb

Overview

Message consumer and processor

Defined Under Namespace

Modules: Utils Classes: Callback, Container, Error, Message, Source, Supervisor

Constant Summary collapse

Logger =
Zoidberg.logger
VERSION =

Current version of library

Gem::Version.new('1.0.2')

Class Method Summary collapse

Class Method Details

.configure { ... } ⇒ self

Add configuration to Carnivore

Yields:

  • block of configuration

Returns:

  • (self)


33
34
35
36
37
# File 'lib/carnivore/runner.rb', line 33

def configure(&block)
  mod = Container.new
  mod.instance_exec(mod, &block)
  self
end

.configure!(*args) ⇒ Bogo::Config

Sets the global configuration

Parameters:

  • path (String)

    configuration file or directory

Returns:

  • (Bogo::Config)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/carnivore/runner.rb', line 10

def configure!(*args)
  if(defined?(Carnivore::Config))
    if(!args.include?(:verify) && !args.include?(:force))
      raise 'Global configuration has already been set!'
    end
    if(args.include?(:force))
      Carnivore.send(:remove_const, :Config)
    end
  end
  unless(defined?(Carnivore::Config))
    Carnivore.const_set(:Config,
      Bogo::Config.new(
        args.first.is_a?(Symbol) ? Smash.new : args.first
      )
    )
  end
  Carnivore::Config
end

.start!Object

Start the Carnivore subsystem



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/carnivore/runner.rb', line 40

def start!
  supervisor = nil
  begin
    require 'carnivore/supervisor'
    configure!(:verify)
    supervisor = Carnivore::Supervisor.build!
    Carnivore::Logger.info 'Initializing all registered sources.'
    [].tap do |register|
      Source.sources.each do |source|
        register << Thread.new do
          supervisor.supervise_as(
            source.source_hash[:name],
            source.klass,
            source.source_hash.dup
          )
        end
      end
    end.map(&:join)
    Carnivore::Logger.info 'Source initializations complete. Running setup and establishing connections.'
    Source.sources.each do |source|
      supervisor[source.source_hash[:name]].run_setup
      supervisor[source.source_hash[:name]].run_connect
    end
    Carnivore::Logger.info 'Sources setup and connected. Enabling message processing.'
    Source.sources.each do |source|
      if(source.source_hash.fetch(:auto_process, true))
        supervisor[source.source_hash[:name]].start!
      end
    end
    loop do
      # We do a sleep loop so we can periodically check on the
      # supervisor and ensure it is still alive. If it has died,
      # raise exception to allow cleanup and restart attempt
      gc_interval = Carnivore::Config.fetch(:carnivore, :garbage_interval, 30)
      gc_last = Time.now.to_i
      while(supervisor.alive?)
        sleep Carnivore::Config.get(:carnivore, :supervisor, :poll) || 5
        if(gc_interval && (Time.now.to_i - gc_last) > gc_interval)
          Carnivore::Logger.debug 'Starting interval forced garbage collection from runner'
          GC.start
          gc_last = Time.now.to_i
        end
      end
      Carnivore::Logger.error 'Carnivore supervisor has died!'
      raise Carnivore::Error::DeadSupervisor.new
    end
  rescue Carnivore::Error::DeadSupervisor
    Carnivore::Logger.warn "Received dead supervisor exception. Attempting to restart."
    begin
      supervisor.terminate if supervisor.alive?
    rescue => e
      Carnivore::Logger.debug "Exception raised during supervisor termination (restart cleanup): #{e}"
    end
    Carnivore::Logger.debug "Pausing restart for 10 seconds to prevent restart thrashing cycles"
    sleep 10
    retry
  rescue Exception => e
    Carnivore::Logger.warn "Exception type encountered forcing shutdown - #{e.class}: #{e}"
    Carnivore::Logger.debug "Shutdown exception info: #{e.class}: #{e}\n#{e.backtrace.join("\n")}"
    supervisor.terminate if supervisor
    raise
    # Gracefully shut down
  end
end

.uuidObject



22
23
24
# File 'lib/carnivore.rb', line 22

def self.uuid
  Zoidberg.uuid
end