Module: Carnivore

Defined in:
lib/carnivore.rb,
lib/carnivore/utils.rb,
lib/carnivore/errors.rb,
lib/carnivore/runner.rb,
lib/carnivore/source.rb,
lib/carnivore/message.rb,
lib/carnivore/version.rb,
lib/carnivore/callback.rb,
lib/carnivore/container.rb,
lib/carnivore/supervisor.rb,
lib/carnivore/source/test.rb,
lib/carnivore/spec_helper.rb,
lib/carnivore/utils/params.rb,
lib/carnivore/utils/failure.rb,
lib/carnivore/utils/logging.rb,
lib/carnivore/source_container.rb,
lib/carnivore/utils/message_registry.rb

Overview

Message consumer and processor

Defined Under Namespace

Modules: Utils Classes: Callback, Container, Error, Message, Source, Supervisor

Constant Summary collapse

VERSION =

Current version of library

Gem::Version.new('0.3.10')

Class Method Summary collapse

Class Method Details

.configure { ... } ⇒ self

Add configuration to Carnivore

Yields:

  • block of configuration

Returns:

  • (self)


33
34
35
36
37
# File 'lib/carnivore/runner.rb', line 33

def configure(&block)
  mod = Container.new
  mod.instance_exec(mod, &block)
  self
end

.configure!(*args) ⇒ Bogo::Config

Sets the global configuration

Parameters:

  • path (String)

    configuration file or directory

Returns:

  • (Bogo::Config)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/carnivore/runner.rb', line 10

def configure!(*args)
  if(defined?(Carnivore::Config))
    if(!args.include?(:verify) && !args.include?(:force))
      raise 'Global configuration has already been set!'
    end
    if(args.include?(:force))
      Carnivore.send(:remove_const, :Config)
    end
  end
  unless(defined?(Carnivore::Config))
    Carnivore.const_set(:Config,
      Bogo::Config.new(
        args.first.is_a?(Symbol) ? Smash.new : args.first
      )
    )
  end
  Carnivore::Config
end

.start!Object

Start the Carnivore subsystem



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/carnivore/runner.rb', line 40

def start!
  supervisor = nil
  begin
    require 'carnivore/supervisor'
    configure!(:verify)
    supervisor = Carnivore::Supervisor.build!
    Celluloid::Logger.info 'Initializing all registered sources.'
    [].tap do |register|
      Source.sources.each do |source|
        register << Thread.new do
          supervisor.supervise_as(
            source.source_hash[:name],
            source.klass,
            source.source_hash.dup
          )
        end
      end
    end.map(&:join)
    Celluloid::Logger.info 'Source initializations complete. Enabling message processing.'
    Source.sources.each do |source|
      if(source.source_hash.fetch(:auto_process, true))
        supervisor[source.source_hash[:name]].start!
      end
    end
    loop do
      # We do a sleep loop so we can periodically check on the
      # supervisor and ensure it is still alive. If it has died,
      # raise exception to allow cleanup and restart attempt
      sleep Carnivore::Config.get(:carnivore, :supervisor, :poll) || 5 while supervisor.alive?
      Celluloid::Logger.error 'Carnivore supervisor has died!'
      raise Carnivore::Error::DeadSupervisor.new
    end
  rescue Carnivore::Error::DeadSupervisor
    Celluloid::Logger.warn "Received dead supervisor exception. Attempting to restart."
    begin
      supervisor.terminate
    rescue => e
      Celluloid::Logger.debug "Exception raised during supervisor termination (restart cleanup): #{e}"
    end
    Celluloid::Logger.debug "Pausing restart for 10 seconds to prevent restart thrashing cycles"
    sleep 10
    retry
  rescue Exception => e
    Celluloid::Logger.warn "Exception type encountered forcing shutdown - #{e.class}: #{e}"
    Celluloid::Logger.debug "Shutdown exception info: #{e.class}: #{e}\n#{e.backtrace.join("\n")}"
    supervisor.terminate if supervisor
    # Gracefully shut down
  end
end