Class: Pwrake::Master

Inherits:
Object
  • Object
show all
Includes:
Option
Defined in:
lib/pwrake/master.rb

Constant Summary

Constants included from Option

Option::DEFAULT_CONFFILES

Instance Attribute Summary collapse

Attributes included from Option

#core_list, #counter, #logfile, #queue_class, #shell_class, #task_logger

Instance Method Summary collapse

Methods included from Option

#cwd_relative_if_under_home, #cwd_relative_to_home, #feedback_options, #finish_option, #format_time_pid, #init_logger, #init_option, #init_options, #init_pass_env, #mount_type, #mountpoint_of_cwd, #option_data, #parse_opt, #pwrake_options, #search_opts, #set_filesystem, #set_hosts, #setup_option

Constructor Details

#initializeMaster

Returns a new instance of Master.



24
25
26
27
28
29
30
# File 'lib/pwrake/master.rb', line 24

def initialize
  init_option    # Pwrake::Option
  setup_option   # Pwrake::Option
  @started = false
  @lock = Mutex.new
  @current_task_id = -1
end

Instance Attribute Details

#filesystemObject (readonly)

Returns the value of attribute filesystem.



21
22
23
# File 'lib/pwrake/master.rb', line 21

def filesystem
  @filesystem
end

#finish_queueObject (readonly)

Returns the value of attribute finish_queue.



19
20
21
# File 'lib/pwrake/master.rb', line 19

def finish_queue
  @finish_queue
end

#postprocessObject (readonly)

Returns the value of attribute postprocess.



22
23
24
# File 'lib/pwrake/master.rb', line 22

def postprocess
  @postprocess
end

#shell_setObject (readonly)

Returns the value of attribute shell_set.



20
21
22
# File 'lib/pwrake/master.rb', line 20

def shell_set
  @shell_set
end

#task_queueObject (readonly)

Returns the value of attribute task_queue.



18
19
20
# File 'lib/pwrake/master.rb', line 18

def task_queue
  @task_queue
end

Instance Method Details

#finishObject



45
46
47
48
49
50
51
# File 'lib/pwrake/master.rb', line 45

def finish
  Log.debug "-- Master#finish called"
  @task_queue.finish if @task_queue
  @threads.each{|t| t.join } if @threads
  @counter.print if @counter
  finish_option   # Pwrake::Option
end

#standard_exception_handlingObject

Provide standard execption handling for the given block.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/pwrake/master.rb', line 100

def standard_exception_handling
  begin
    yield
  rescue SystemExit => ex
    # Exit silently with current status
    @task_queue.stop
    raise
  rescue OptionParser::InvalidOption => ex
    # Exit silently
    @task_queue.stop
    exit(false)
  rescue Exception => ex
    # Exit with error message
    name = "pwrake"
    $stderr.puts "#{name} aborted!"
    $stderr.puts ex.message
    if Rake.application.options.trace
      $stderr.puts ex.backtrace.join("\n")
    else
      $stderr.puts ex.backtrace.find {|str| str =~ /#{@rakefile}/ } || ""
      $stderr.puts "(See full trace by running task with --trace)"
    end
    @task_queue.stop
    exit(false)
  end
end

#startObject



32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/pwrake/master.rb', line 32

def start
  return if @task_queue
  timer = Timer.new("start_worker")
  @finish_queue = Queue.new
  @task_queue = @queue_class.new(@core_list)
  @shell_set = []
  @core_list.each_with_index do |h,i|
    @shell_set << @shell_class.new(h,@shell_opt)
  end
  start_threads
  timer.finish
end

#start_threadsObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/pwrake/master.rb', line 53

def start_threads
  Thread.abort_on_exception = true
  @threads = []
  t_intvl = Pwrake.application.pwrake_options['THREAD_CREATE_INTERVAL']
  @shell_set.each do |c|
    tc0 = Time.now
    @threads << Thread.new(c) do |conn|
      Pwrake.current_shell = conn
      t0 = Time.now
      conn.start
      t = Time.now - t0
      Log.info "-- worker[#{conn.id}] connect to #{conn.host}: %.3f sec" % t
      begin
        thread_loop(conn)
      ensure
        Log.info "-- worker[#{conn.id}] ensure : closing #{conn.host}"
        conn.finish
      end
    end
    t_sleep = t_intvl - (Time.now - tc0)
    sleep t_sleep if t_sleep > 0
  end
end

#task_id_counterObject



93
94
95
96
97
# File 'lib/pwrake/master.rb', line 93

def task_id_counter
  @lock.synchronize do
    @current_task_id += 1
  end
end

#thread_loop(conn, last = nil) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/pwrake/master.rb', line 77

def thread_loop(conn,last=nil)
  @task_queue.reserve(last) if last
  hint = (conn) ? conn.host : nil
  standard_exception_handling do
    while true
    time_start = Time.now
    t = @task_queue.deq(hint)
    break if !t
    time_deq = Time.now - time_start
      Log.debug "--- Master#thread_loop deq t=#{t.inspect} time=#{time_deq}sec"
      t.pw_invoke
      return if t == last
    end
  end
end