Class: DRbQS::Server
- Inherits:
-
Object
- Object
- DRbQS::Server
- Defined in:
- lib/drbqs/server.rb
Overview
When we set both empty_queue_hook and task_generator, empty_queue_hook is prior to task_generator.
Constant Summary collapse
- WAIT_TIME_NODE_EXIT =
3
- WAIT_TIME_NODE_FINALIZE =
10
- WAIT_TIME_NEW_RESULT =
1
Instance Attribute Summary collapse
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
Instance Method Summary collapse
-
#add_hook(key, name = nil, &block) ⇒ Object
key
is :empty_queue or :finish_exit. - #add_task_generator(task_generator) ⇒ Object
- #delete_hook(key, name = nil) ⇒ Object
- #exit ⇒ Object
-
#initialize(opts = {}) ⇒ Server
constructor
:port Set the port of server.
- #set_file_transfer(directory, opts = {}) ⇒ Object
- #set_finalization_task(task) ⇒ Object
- #set_initialization_task(task) ⇒ Object
- #set_signal_trap ⇒ Object
- #start ⇒ Object
- #test_exec(opts = {}) ⇒ Object
- #test_task_generator(opts = {}) ⇒ Object
- #transfer_directory ⇒ Object
- #wait ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Server
:port
Set the port of server.
:acl
Set the ACL instance.
:log_file
Set the path of log files.
:log_level
Set the level of logging.
:check_alive
Set the time interval of checking alive nodes.
:finish_exit
Exit programs in finish_hook.
:signal_trap
Set trapping signal.
:scp_user
Set user of scp.
:scp_host
Set host of scp.
:file_directory
Set the setting of file directory.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/drbqs/server.rb', line 56 def initialize(opts = {}) @port = opts[:port] || ROOT_DEFAULT_PORT @acl = acl_init(opts[:acl]) @ts = { :message => Rinda::TupleSpace.new, :queue => Rinda::TupleSpace.new, :result => Rinda::TupleSpace.new, :transfer => nil } @logger = DRbQS::Utils.create_logger(opts[:log_file], opts[:log_level]) @message = MessageServer.new(@ts[:message], @logger) @queue= QueueServer.new(@ts[:queue], @ts[:result], @logger) @check_alive = CheckAlive.new(opts[:check_alive]) @task_generator = [] hook_init(opts[:finish_exit]) set_signal_trap if opts[:signal_trap] @finalization_task = nil @transfer_setting = get_transfer_setting(opts[:scp_host], opts[:scp_user], opts[:file_directory]) end |
Instance Attribute Details
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
34 35 36 |
# File 'lib/drbqs/server.rb', line 34 def queue @queue end |
Instance Method Details
#add_hook(key, name = nil, &block) ⇒ Object
key
is :empty_queue or :finish_exit. &block takes self as an argument.
162 163 164 |
# File 'lib/drbqs/server.rb', line 162 def add_hook(key, name = nil, &block) @hook.add(key, name, &block) end |
#add_task_generator(task_generator) ⇒ Object
127 128 129 |
# File 'lib/drbqs/server.rb', line 127 def add_task_generator(task_generator) @task_generator << task_generator end |
#delete_hook(key, name = nil) ⇒ Object
166 167 168 |
# File 'lib/drbqs/server.rb', line 166 def delete_hook(key, name = nil) @hook.delete(key, name) end |
#exit ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/drbqs/server.rb', line 190 def exit if @finalization_task @message.send_finalization(@finalization_task) wait_time = WAIT_TIME_NODE_FINALIZE else @message.send_exit wait_time = WAIT_TIME_NODE_EXIT end until @message.node_not_exist? sleep(wait_time) check_connection(true) end @logger.info("History of tasks") { "\n" + @queue.all_logs } if @logger Kernel.exit end |
#set_file_transfer(directory, opts = {}) ⇒ Object
212 213 214 215 216 217 |
# File 'lib/drbqs/server.rb', line 212 def set_file_transfer(directory, opts = {}) user = opts[:user] || @transfer_setting[:user] || ENV['USER'] host = opts[:host] || @transfer_setting[:host] || 'localhost' @ts[:transfer] = DRbQS::Transfer.new(user, host, directory) @logger.info("File transfer") { @ts[:transfer].information } if @logger end |
#set_finalization_task(task) ⇒ Object
156 157 158 |
# File 'lib/drbqs/server.rb', line 156 def set_finalization_task(task) @finalization_task = task end |
#set_initialization_task(task) ⇒ Object
152 153 154 |
# File 'lib/drbqs/server.rb', line 152 def set_initialization_task(task) @message.set_initialization(task) end |
#set_signal_trap ⇒ Object
206 207 208 209 210 |
# File 'lib/drbqs/server.rb', line 206 def set_signal_trap Signal.trap(:TERM) do self.exit end end |
#start ⇒ Object
107 108 109 110 111 112 113 114 115 |
# File 'lib/drbqs/server.rb', line 107 def start if @transfer_setting[:set] && @transfer_setting[:directory] && !@ts[:transfer] set_file_transfer(@transfer_setting[:directory]) end DRb.install_acl(@acl) if @acl uri = "druby://:#{@port}" DRb.start_service(uri, @ts) @logger.info("Start DRb service") { uri } if @logger end |
#test_exec(opts = {}) ⇒ Object
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/drbqs/server.rb', line 269 def test_exec(opts = {}) task_generator_init dummy_client = DRbQS::Client.new(nil, :log_file => $stdout, :log_level => opts[:log_level]) dummy_task_client = DRbQS::TaskClient.new(nil, @ts[:queue], nil) if @ts[:transfer] dummy_client.instance_variable_set(:@transfer, DRbQS::TransferTest.new(@ts[:transfer].directory)) end num = 0 start_profile if opts[:profile] loop do exec_hook if ary = dummy_task_client.get_task task_id, marshal_obj, method_sym, args = ary result = dummy_client.instance_eval { execute_task(marshal_obj, method_sym, args) } @queue.exec_task_hook(self, task_id, result) end num += 1 if opts[:limit] && num >= opts[:limit] break end end finish_profile if opts[:profile] if @finalization_task args = @finalization_task.drb_args(nil)[1..-1] dummy_client.instance_eval { execute_task(*args) } end exec_finish_hook end |
#test_task_generator(opts = {}) ⇒ Object
298 299 300 301 302 303 304 305 |
# File 'lib/drbqs/server.rb', line 298 def test_task_generator(opts = {}) task_generator_init @task_generator.each_with_index do |t, i| puts "Test task generator [#{i}]" set_num, task_num = t.debug_all_tasks(opts) puts "Create: task sets #{set_num}, all tasks #{task_num}" end end |
#transfer_directory ⇒ Object
76 77 78 |
# File 'lib/drbqs/server.rb', line 76 def transfer_directory @ts[:transfer] && @ts[:transfer].directory end |
#wait ⇒ Object
240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/drbqs/server.rb', line 240 def wait task_generator_init loop do check_connection count_results = @queue.get_result(self) exec_hook @logger.debug("Calculating tasks: #{@queue.calculating_task_number}") if @logger if count_results <= 1 sleep(WAIT_TIME_NEW_RESULT) end end end |