Class: Filbunke::Daemon

Inherits:
Object
  • Object
show all
Defined in:
lib/filbunke/daemon.rb

Instance Method Summary collapse

Constructor Details

#initialize(config, local = false) ⇒ Daemon

Returns a new instance of Daemon.



4
5
6
7
8
9
# File 'lib/filbunke/daemon.rb', line 4

def initialize(config, local = false)
  @config = config
  @clients = []
  setup_clients!(local)
  write_pid!(config["pid_file"]) unless local
end

Instance Method Details

#process_is_alive(pid) ⇒ Object



66
67
68
# File 'lib/filbunke/daemon.rb', line 66

def process_is_alive(pid)
  !!Process.kill(0, pid) rescue false
end

#run!Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/filbunke/daemon.rb', line 40

def run!
  version = ::File.read(::File.expand_path(::File.join(::File.dirname(__FILE__), "../../VERSION"))).chomp
  @logger.log("Starting filbunked version #{version}")
  client_pids = []
  @parent_pid = Process.pid
  @clients.each do |client|
    client_pids << fork do 
      begin
        while process_is_alive(@parent_pid)
          new_checkpoint = client.update_files!(checkpoint_for_repository(client.repository))
          @logger.info "Update finished for #{client.repository.name} new checkpoint => #{new_checkpoint}" 
          update_checkpoint_for_repository(client.repository, new_checkpoint)
          sleep client.repository.run_every
        end
      rescue => e
        msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
        @logger.error("#{client.repository.name} Died..  #{msg}")
        Process.kill("KILL", @parent_pid)
        exit 1
      end
    end
  
  end
  client_pids.each { |pid| Process.wait(pid) }
end

#setup_clients!(local = false) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/filbunke/daemon.rb', line 11

def setup_clients!(local = false)
  @logger = FilbunkeLogger.new(@config["log_file"], local, @config["log_level"])
  @logger.log("Initializing filbunked")
  @config["repositories"].each do |repository_name, repository_config|
    @logger.log("Initializing repository: #{repository_name}")
    @clients << begin
      repository_config["run_every"] = repository_config.fetch("run_every", @config.fetch("run_every", 10))
      repository_config["batch_size"] = repository_config.fetch("batch_size", @config.fetch("batch_size", 0))
      repository = Repository.new(repository_config)
      callbacks = []
      repository_config["callbacks"].each do |callback_name, callback_config|
        require ::File.join(@config["callback_path"], callback_name.to_s)
        callback_config["num_callback_threads"] = callback_config.fetch("num_callback_threads", repository_config.fetch("num_callback_threads", @config.fetch("num_callback_threads", 0)))
        callback_config["num_callback_processes"] = callback_config.fetch("num_callback_processes", repository_config.fetch("num_callback_processes", @config.fetch("num_callback_processes", 0)))
        callback_class =  Module.const_get(callback_name.split("_").map(&:capitalize).join)
        callbacks << callback_class.new(@logger, callback_config)
      end
      failed_request_log_file_name = repository_config["failed_request_log_file_name"]||nil
      
      Client.new(repository, @logger, callbacks, failed_request_log_file_name)
    rescue => e
      msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
      STDERR.puts("Failed to initialize #{repository_name}; #{msg}")
      @logger.error("Failed to initialize #{repository_name}; #{msg}")
      raise e
    end
  end
end