Class: Pwrake::Master
Constant Summary
Constants included from Option
Instance Attribute Summary collapse
-
#filesystem ⇒ Object
readonly
Returns the value of attribute filesystem.
-
#finish_queue ⇒ Object
readonly
Returns the value of attribute finish_queue.
-
#postprocess ⇒ Object
readonly
Returns the value of attribute postprocess.
-
#shell_set ⇒ Object
readonly
Returns the value of attribute shell_set.
-
#task_queue ⇒ Object
readonly
Returns the value of attribute task_queue.
Attributes included from Option
#core_list, #counter, #logfile, #queue_class, #shell_class, #task_logger
Instance Method Summary collapse
- #finish ⇒ Object
-
#initialize ⇒ Master
constructor
A new instance of Master.
-
#standard_exception_handling ⇒ Object
Provide standard execption handling for the given block.
- #start ⇒ Object
- #start_threads ⇒ Object
- #task_id_counter ⇒ Object
- #thread_loop(conn, last = nil) ⇒ Object
Methods included from Option
#cwd_relative_if_under_home, #cwd_relative_to_home, #feedback_options, #finish_option, #format_time_pid, #init_logger, #init_option, #init_options, #init_pass_env, #mount_type, #mountpoint_of_cwd, #option_data, #parse_opt, #pwrake_options, #search_opts, #set_filesystem, #set_hosts, #setup_option
Constructor Details
#initialize ⇒ Master
Returns a new instance of Master.
24 25 26 27 28 29 30 |
# File 'lib/pwrake/master.rb', line 24 def initialize init_option # Pwrake::Option setup_option # Pwrake::Option @started = false @lock = Mutex.new @current_task_id = -1 end |
Instance Attribute Details
#filesystem ⇒ Object (readonly)
Returns the value of attribute filesystem.
21 22 23 |
# File 'lib/pwrake/master.rb', line 21 def filesystem @filesystem end |
#finish_queue ⇒ Object (readonly)
Returns the value of attribute finish_queue.
19 20 21 |
# File 'lib/pwrake/master.rb', line 19 def finish_queue @finish_queue end |
#postprocess ⇒ Object (readonly)
Returns the value of attribute postprocess.
22 23 24 |
# File 'lib/pwrake/master.rb', line 22 def postprocess @postprocess end |
#shell_set ⇒ Object (readonly)
Returns the value of attribute shell_set.
20 21 22 |
# File 'lib/pwrake/master.rb', line 20 def shell_set @shell_set end |
#task_queue ⇒ Object (readonly)
Returns the value of attribute task_queue.
18 19 20 |
# File 'lib/pwrake/master.rb', line 18 def task_queue @task_queue end |
Instance Method Details
#finish ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/pwrake/master.rb', line 45 def finish Log.debug "-- Master#finish called" @task_queue.finish if @task_queue @threads.each{|t| t.join } if @threads @counter.print if @counter finish_option # Pwrake::Option end |
#standard_exception_handling ⇒ Object
Provide standard execption handling for the given block.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/pwrake/master.rb', line 100 def standard_exception_handling begin yield rescue SystemExit => ex # Exit silently with current status @task_queue.stop raise rescue OptionParser::InvalidOption => ex # Exit silently @task_queue.stop exit(false) rescue Exception => ex # Exit with error message name = "pwrake" $stderr.puts "#{name} aborted!" $stderr.puts ex. if Rake.application..trace $stderr.puts ex.backtrace.join("\n") else $stderr.puts ex.backtrace.find {|str| str =~ /#{@rakefile}/ } || "" $stderr.puts "(See full trace by running task with --trace)" end @task_queue.stop exit(false) end end |
#start ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/pwrake/master.rb', line 32 def start return if @task_queue timer = Timer.new("start_worker") @finish_queue = Queue.new @task_queue = @queue_class.new(@core_list) @shell_set = [] @core_list.each_with_index do |h,i| @shell_set << @shell_class.new(h,@shell_opt) end start_threads timer.finish end |
#start_threads ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/pwrake/master.rb', line 53 def start_threads Thread.abort_on_exception = true @threads = [] t_intvl = Pwrake.application.['THREAD_CREATE_INTERVAL'] @shell_set.each do |c| tc0 = Time.now @threads << Thread.new(c) do |conn| Pwrake.current_shell = conn t0 = Time.now conn.start t = Time.now - t0 Log.info "-- worker[#{conn.id}] connect to #{conn.host}: %.3f sec" % t begin thread_loop(conn) ensure Log.info "-- worker[#{conn.id}] ensure : closing #{conn.host}" conn.finish end end t_sleep = t_intvl - (Time.now - tc0) sleep t_sleep if t_sleep > 0 end end |
#task_id_counter ⇒ Object
93 94 95 96 97 |
# File 'lib/pwrake/master.rb', line 93 def task_id_counter @lock.synchronize do @current_task_id += 1 end end |
#thread_loop(conn, last = nil) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/pwrake/master.rb', line 77 def thread_loop(conn,last=nil) @task_queue.reserve(last) if last hint = (conn) ? conn.host : nil standard_exception_handling do while true time_start = Time.now t = @task_queue.deq(hint) break if !t time_deq = Time.now - time_start Log.debug "--- Master#thread_loop deq t=#{t.inspect} time=#{time_deq}sec" t.pw_invoke return if t == last end end end |