Class: DRbQS::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/server.rb

Overview

When we set both empty_queue_hook and task_generator, empty_queue_hook is prior to task_generator.

Constant Summary collapse

WAIT_TIME_NODE_EXIT =
3
WAIT_TIME_NODE_FINALIZE =
10
WAIT_TIME_NEW_RESULT =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Server

:port

Set the port of server.

:acl

Set the ACL instance.

:log_file

Set the path of log files.

:log_level

Set the level of logging.

:check_alive

Set the time interval of checking alive nodes.

:finish_exit

Exit programs in finish_hook.

:signal_trap

Set trapping signal.

:scp_user

Set user of scp.

:scp_host

Set host of scp.

:file_directory

Set the setting of file directory.


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/drbqs/server.rb', line 56

def initialize(opts = {})
  @port = opts[:port] || ROOT_DEFAULT_PORT
  @acl = acl_init(opts[:acl])
  @ts = {
    :message => Rinda::TupleSpace.new,
    :queue => Rinda::TupleSpace.new,
    :result => Rinda::TupleSpace.new,
    :transfer => nil
  }
  @logger = DRbQS::Utils.create_logger(opts[:log_file], opts[:log_level])
  @message = MessageServer.new(@ts[:message], @logger)
  @queue= QueueServer.new(@ts[:queue], @ts[:result], @logger)
  @check_alive = CheckAlive.new(opts[:check_alive])
  @task_generator = []
  hook_init(opts[:finish_exit])
  set_signal_trap if opts[:signal_trap]
  @finalization_task = nil
  @transfer_setting = get_transfer_setting(opts[:scp_host], opts[:scp_user], opts[:file_directory])
end

Instance Attribute Details

#queueObject (readonly)

Returns the value of attribute queue.



34
35
36
# File 'lib/drbqs/server.rb', line 34

def queue
  @queue
end

Instance Method Details

#add_hook(key, name = nil, &block) ⇒ Object

key is :empty_queue or :finish_exit. &block takes self as an argument.



162
163
164
# File 'lib/drbqs/server.rb', line 162

def add_hook(key, name = nil, &block)
  @hook.add(key, name, &block)
end

#add_task_generator(task_generator) ⇒ Object



127
128
129
# File 'lib/drbqs/server.rb', line 127

def add_task_generator(task_generator)
  @task_generator << task_generator
end

#delete_hook(key, name = nil) ⇒ Object



166
167
168
# File 'lib/drbqs/server.rb', line 166

def delete_hook(key, name = nil)
  @hook.delete(key, name)
end

#exitObject



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/drbqs/server.rb', line 190

def exit
  if @finalization_task
    @message.send_finalization(@finalization_task)
    wait_time = WAIT_TIME_NODE_FINALIZE
  else
    @message.send_exit
    wait_time = WAIT_TIME_NODE_EXIT
  end
  until @message.node_not_exist?
    sleep(wait_time)
    check_connection(true)
  end
  @logger.info("History of tasks") { "\n" + @queue.all_logs } if @logger
  Kernel.exit
end

#set_file_transfer(directory, opts = {}) ⇒ Object



212
213
214
215
216
217
# File 'lib/drbqs/server.rb', line 212

def set_file_transfer(directory, opts = {})
  user = opts[:user] || @transfer_setting[:user] || ENV['USER']
  host = opts[:host] || @transfer_setting[:host] || 'localhost'
  @ts[:transfer] = DRbQS::Transfer.new(user, host, directory)
  @logger.info("File transfer") { @ts[:transfer].information } if @logger
end

#set_finalization_task(task) ⇒ Object



156
157
158
# File 'lib/drbqs/server.rb', line 156

def set_finalization_task(task)
  @finalization_task = task
end

#set_initialization_task(task) ⇒ Object



152
153
154
# File 'lib/drbqs/server.rb', line 152

def set_initialization_task(task)
  @message.set_initialization(task)
end

#set_signal_trapObject



206
207
208
209
210
# File 'lib/drbqs/server.rb', line 206

def set_signal_trap
  Signal.trap(:TERM) do
    self.exit
  end
end

#startObject



107
108
109
110
111
112
113
114
115
# File 'lib/drbqs/server.rb', line 107

def start
  if @transfer_setting[:set] && @transfer_setting[:directory] && !@ts[:transfer]
    set_file_transfer(@transfer_setting[:directory])
  end
  DRb.install_acl(@acl) if @acl
  uri = "druby://:#{@port}"
  DRb.start_service(uri, @ts)
  @logger.info("Start DRb service") { uri } if @logger
end

#test_exec(opts = {}) ⇒ Object



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/drbqs/server.rb', line 269

def test_exec(opts = {})
  task_generator_init
  dummy_client = DRbQS::Client.new(nil, :log_file => $stdout, :log_level => opts[:log_level])
  dummy_task_client = DRbQS::TaskClient.new(nil, @ts[:queue], nil)
  if @ts[:transfer]
    dummy_client.instance_variable_set(:@transfer, DRbQS::TransferTest.new(@ts[:transfer].directory))
  end
  num = 0
  start_profile if opts[:profile]
  loop do
    exec_hook
    if ary = dummy_task_client.get_task
      task_id, marshal_obj, method_sym, args = ary
      result = dummy_client.instance_eval { execute_task(marshal_obj, method_sym, args) }
      @queue.exec_task_hook(self, task_id, result)
    end
    num += 1
    if opts[:limit] && num >= opts[:limit]
      break
    end
  end
  finish_profile if opts[:profile]
  if @finalization_task
    args = @finalization_task.drb_args(nil)[1..-1]
    dummy_client.instance_eval { execute_task(*args) }
  end
  exec_finish_hook
end

#test_task_generator(opts = {}) ⇒ Object



298
299
300
301
302
303
304
305
# File 'lib/drbqs/server.rb', line 298

def test_task_generator(opts = {})
  task_generator_init
  @task_generator.each_with_index do |t, i|
    puts "Test task generator [#{i}]"
    set_num, task_num = t.debug_all_tasks(opts)
    puts "Create: task sets #{set_num}, all tasks #{task_num}"
  end
end

#transfer_directoryObject



76
77
78
# File 'lib/drbqs/server.rb', line 76

def transfer_directory
  @ts[:transfer] && @ts[:transfer].directory
end

#waitObject



240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/drbqs/server.rb', line 240

def wait
  task_generator_init
  loop do
    check_message
    check_connection
    count_results = @queue.get_result(self)
    exec_hook
    @logger.debug("Calculating tasks: #{@queue.calculating_task_number}") if @logger
    if count_results <= 1
      sleep(WAIT_TIME_NEW_RESULT)
    end
  end
end