Class: Skynet

Inherits:
Object
  • Object
show all
Includes:
SkynetDebugger
Defined in:
lib/skynet/version.rb,
lib/skynet.rb,
lib/skynet/skynet_job.rb,
lib/skynet/skynet_task.rb,
lib/skynet/skynet_config.rb,
lib/skynet/skynet_logger.rb,
lib/skynet/skynet_worker.rb,
lib/skynet/mapreduce_test.rb,
lib/skynet/skynet_console.rb,
lib/skynet/skynet_manager.rb,
lib/skynet/skynet_message.rb,
lib/skynet/skynet_launcher.rb,
lib/skynet/skynet_partitioners.rb,
lib/skynet/skynet_message_queue.rb,
lib/skynet/skynet_guid_generator.rb,
lib/skynet/message_queue_adapters/mysql.rb,
lib/skynet/message_queue_adapters/message_queue_adapter.rb,
lib/skynet/message_queue_adapters/tuple_space.rb,
lib/skynet/skynet_tuplespace_server.rb

Overview

FIXME: should be a module

Defined Under Namespace

Modules: ConsoleHelper, GuidGenerator, Loggable, VERSION Classes: AbstractClassError, ActiveRecordAsync, AsyncJob, Config, ConnectionError, Console, Error, InvalidMessage, Job, Logger, Manager, MapreduceTest, Message, MessageQueue, MessageQueueAdapter, Partitioners, ProfileCountTest, QueueTimeout, RequestExpiredError, Task, TaskIterator, TuplespaceServer, UniqueDBNumGenerator, Worker, WorkerStatusMessage, WorkerVersionMessage

Constant Summary collapse

CONFIG =
{
  :ENABLE                               => true,
  :SOLO                                 => false,
  :APP_ROOT                             => nil,
  :SKYNET_LOG_DIR                       => nil,
  :SKYNET_PID_DIR                       => nil,
  :SKYNET_PID_FILE                      => "skynet.pid",
  :SKYNET_LOG_FILE                      => "skynet.log",
  :SKYNET_MANAGER_STATS_FILE            => "skynet_manager_stats.txt",
  :SKYNET_LOG_LEVEL                     => Logger::ERROR,
  :SKYNET_LOCAL_MANAGER_PORT            => "40000",
  :MESSAGE_QUEUE_ADAPTER                => ("Skynet::MessageQueueAdapter::TupleSpace" || "Skynet::MessageQueueAdapter::Mysql"),
  :TS_USE_RINGSERVER                    => true,
  :TS_DRBURI                            => "druby://localhost:47647",   # If you do not use RINGSERVER, you must specifiy the DRBURI
  :TS_SERVER_HOSTS                      => ["localhost:7647"],
  :TS_SERVER_START_DELAY                => 10,
  # :MYSQL_QUEUE_DATABASE                 => "skynet_queue",
  :MYSQL_TEMPERATURE_CHANGE_SLEEP       => 40,
  :MYSQL_MESSAGE_QUEUE_TAPLE            => "skynet_message_queues",
  :MYSQL_MESSAGE_QUEUE_TEMP_CHECK_DELAY => 40,
  :MYSQL_NEXT_TASK_TIMEOUT              => 60,
  :MYSQL_ADAPTER                        => "mysql",
  :MYSQL_HOST                           => "localhost",
  :MYSQL_DATABASE                       => nil, # 'skynet'
  :MYSQL_USERNAME                       => nil,
  :MYSQL_PASSWORD                       => "",
  :NUMBER_OF_WORKERS                    => 4,
  :WORKER_CHECK_DELAY                   => 5,
  :WORKER_MAX_MEMORY                    => 500,
  :WORKER_MAX_PROCESSED                 => 1000,
  :WORKER_VERSION_CHECK_DELAY           => 30,
  # :GUID_GENERATOR                     => nil,
  :PERCENTAGE_OF_TASK_ONLY_WORKERS      => 0.7,
  :PERCENTAGE_OF_MASTER_ONLY_WORKERS    => 0.2,
  :MAX_RETRIES                          => 6,
  :DEFAULT_MASTER_RETRY                 => 0,
  :DEFAULT_MAP_RETRY                    => 3,
  :DEFAULT_REDUCE_RETRY                 => 3,
  :DEFAULT_KEEP_MAP_TASKS               => 1,
  :DEFAULT_KEEP_REDUCE_TASKS            => 1,
  :MESSAGE_QUEUES                       => ['first', 'second', 'third', 'fourth', 'fifth', 'sixth', 'seventh', 'eighth', 'nineth']
}

Class Method Summary collapse

Methods included from SkynetDebugger

#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn

Class Method Details

.close_consoleObject



82
83
84
85
86
# File 'lib/skynet.rb', line 82

def self.close_console
  STDIN.reopen "/dev/null"
  STDOUT.reopen "/dev/null", "a"
  STDERR.reopen STDOUT 
end

.close_files(from = 3, to = 50) ⇒ Object

close open file descriptors starting with STDERR+1



75
76
77
78
79
80
# File 'lib/skynet.rb', line 75

def self.close_files(from=3, to=50)
  close_console
  (from .. to).each do |fd|
    IO.for_fd(fd).close rescue nil
   end
end

.configObject



70
71
72
# File 'lib/skynet/skynet_config.rb', line 70

def self.config
  Skynet::Config.new
end

.configure(config = {}) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/skynet/skynet_config.rb', line 56

def self.configure(config={})
  old_config = CONFIG.dup
  config.each {|k,v| CONFIG[k] = v}
  Skynet::Logger.log = nil
  if block_given?
    ret = yield
    CONFIG.keys.each do |key|
      CONFIG.delete(key)
    end
    old_config.each {|k,v| CONFIG[k] = v}
    ret
  end
end

.fork_and_exec(command) ⇒ Object

kinda like system() but gives me back a pid



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/skynet.rb', line 50

def self.fork_and_exec(command)
  sleep 0.01  # remove contention on manager drb object
  log = Skynet::Logger.get
  debug "executing /bin/sh -c \"#{command}\""
  pid = safefork do
    close_files
    exec("/bin/sh -c \"#{command}\"")
    exit
  end
  Process.detach(pid)
  pid
end

.new(options = {}) ⇒ Object



36
37
38
39
# File 'lib/skynet/skynet_launcher.rb', line 36

def self.new(options={})
  warn("Skynet.new is deprecated, please use Skynet.start instead")
  start(options)
end

.process_alive?(worker_pid) ⇒ Boolean

Returns:

  • (Boolean)


88
89
90
91
92
93
# File 'lib/skynet.rb', line 88

def self.process_alive?(worker_pid)
  Process.kill(0,worker_pid)
  return true
rescue Errno::ESRCH => e
  return false
end

.safefork(&block) ⇒ Object



63
64
65
66
67
68
69
70
71
# File 'lib/skynet.rb', line 63

def self.safefork (&block)
  @fork_tries ||= 0
  fork(&block)
rescue Errno::EWOULDBLOCK
  raise if @fork_tries >= 20
  @fork_tries += 1
  sleep 5
  retry
end

.silentObject



46
47
48
49
50
51
52
53
54
# File 'lib/skynet/skynet_config.rb', line 46

def self.silent
  if block_given?
    Skynet.configure(:SKYNET_LOG_LEVEL => 10) do
      yield
    end
  else
    raise Error.new("Please provide a block to Skynet.silent")
  end
end

.solo(config = {}) ⇒ Object

Raises:



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/skynet/skynet_config.rb', line 74

def self.solo(config = {}) 
  raise Skynet::Error.new("You provide a code block to Skynet.solo") unless block_given?
  result = nil
  Skynet::Logger.log = nil
  begin
    config[:ENABLE]              = true
    config[:SOLO]                = true
    config[:SKYNET_LOG_FILE]   ||= STDOUT
    config[:SKYNET_LOG_LEVEL]  ||= Logger::ERROR
    configure(config) do
      result = yield
    end
  rescue Exception => e
    error "Something bad happened #{e.inspect} #{e.backtrace.join("\n")}"
  end
  return result
end

.start(options = {}) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/skynet/skynet_launcher.rb', line 4

def self.start(options={})
  begin
    mq = Skynet::MessageQueue.new
  rescue Skynet::ConnectionError
    if Skynet::MessageQueue.adapter == :tuplespace
      ts_port = Skynet::CONFIG[:TS_SERVER_HOSTS].first.split(':').last
      # puts "trying to make ts skynet_tuplespace_server --port=#{ts_port} --logfile=#{Skynet.config.logfile_location} --piddir=#{Skynet.config.skynet_pid_dir} --use_ringserver=#{Skynet.config.ts_use_ringserver} --drburi=#{Skynet.config.ts_drburi} start"
      cmd = "skynet_tuplespace_server  --port=#{ts_port} --logfile=#{Skynet.config.logfile_location} --piddir=#{Skynet.config.skynet_pid_dir} --use_ringserver=#{Skynet.config.ts_use_ringserver} --drburi=#{Skynet.config.ts_drburi} start"
      pid = fork do
        exec(cmd)
      end
      sleep Skynet::CONFIG[:TS_SERVER_START_DELAY]
    end
  end

  options[:script_path] = Skynet::CONFIG[:LAUNCHER_PATH]
  
  if ARGV.detect {|a| a == 'console' }
    ARGV.delete('console')
    Skynet::Console.start
  elsif options[:worker_type] or ARGV.detect {|a| a =~ /worker_type/ }
    Skynet::Worker.start(options)
  else
    if ARGV.include?('stop')
      Skynet::Manager.stop(options)
    else
      options["daemonize"] = true if ARGV.include?('start')      
      Skynet::Manager.start(options)
    end
  end
end