Class: DRbQS::QueueServer
- Inherits:
-
Object
- Object
- DRbQS::QueueServer
- Includes:
- HistoryUtils
- Defined in:
- lib/drbqs/queue.rb
Instance Attribute Summary collapse
-
#calculating ⇒ Object
readonly
Returns the value of attribute calculating.
-
#history ⇒ Object
readonly
Returns the value of attribute history.
Instance Method Summary collapse
-
#add(task) ⇒ Object
&hook take two arguments: a QueueServer object and a result of task.
- #all_logs ⇒ Object
- #calculating_task_number ⇒ Object
-
#empty? ⇒ Boolean
If queue is empty, return true.
- #exec_task_hook(main_server, task_id, result) ⇒ Object
-
#finished? ⇒ Boolean
If there are no tasks in queue and calculating, return true.
- #get_accept_signal ⇒ Object
- #get_result(main_server) ⇒ Object
-
#initialize(queue, result, logger = nil) ⇒ QueueServer
constructor
A new instance of QueueServer.
- #requeue_for_deleted_node_id(deleted) ⇒ Object
Methods included from HistoryUtils
Constructor Details
#initialize(queue, result, logger = nil) ⇒ QueueServer
Returns a new instance of QueueServer.
10 11 12 13 14 15 16 17 18 |
# File 'lib/drbqs/queue.rb', line 10 def initialize(queue, result, logger = nil) @queue = queue @result = result @task_id = 0 @cache = {} @calculating = Hash.new { |hash, key| hash[key] = Array.new } @history = DRbQS::History.new @logger = logger end |
Instance Attribute Details
#calculating ⇒ Object (readonly)
Returns the value of attribute calculating.
8 9 10 |
# File 'lib/drbqs/queue.rb', line 8 def calculating @calculating end |
#history ⇒ Object (readonly)
Returns the value of attribute history.
8 9 10 |
# File 'lib/drbqs/queue.rb', line 8 def history @history end |
Instance Method Details
#add(task) ⇒ Object
&hook take two arguments: a QueueServer object and a result of task. Return task ID (for debug).
27 28 29 30 31 32 33 34 |
# File 'lib/drbqs/queue.rb', line 27 def add(task) @task_id += 1 @logger.info("New task: #{@task_id}") if @logger @cache[@task_id] = task queue_task(@task_id) @history.set(@task_id, :add) @task_id end |
#all_logs ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/drbqs/queue.rb', line 121 def all_logs s = '' @history.each do |task_id, events| s << "Task #{task_id}\n" events.each do |ev| case ev[1] when :add, :requeue, :hook s << " #{time_to_string(ev[0])}\t#{ev[1]}\n" when :calculate, :result s << " #{time_to_string(ev[0])}\t#{ev[1]} (node #{ev[2]})\n" end end end s end |
#calculating_task_number ⇒ Object
104 105 106 |
# File 'lib/drbqs/queue.rb', line 104 def calculating_task_number @calculating.inject(0) { |s, key_val| s + key_val[1].size } end |
#empty? ⇒ Boolean
If queue is empty, return true. Otherwise, false. Even if there are calculating tasks, the method can return true.
111 112 113 |
# File 'lib/drbqs/queue.rb', line 111 def empty? @cache.size - calculating_task_number == 0 end |
#exec_task_hook(main_server, task_id, result) ⇒ Object
75 76 77 78 79 80 81 82 83 84 |
# File 'lib/drbqs/queue.rb', line 75 def exec_task_hook(main_server, task_id, result) if task = @cache.delete(task_id) if hook = task.hook @history.set(task_id, :hook) hook.call(main_server, result) end else @logger.error("Task #{task_id} is not cached.") if @logger end end |
#finished? ⇒ Boolean
If there are no tasks in queue and calculating, return true. Otherwise, false.
117 118 119 |
# File 'lib/drbqs/queue.rb', line 117 def finished? @cache.size == 0 end |
#get_accept_signal ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/drbqs/queue.rb', line 36 def get_accept_signal count = 0 begin loop do sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0) count += 1 @calculating[node_id] << task_id @history.set(task_id, :calculate, node_id) @logger.info("Accept: task #{task_id} by node #{node_id}.") if @logger end rescue Rinda::RequestExpiredError @logger.debug("Accept: #{count} signals.") if @logger end count end |
#get_result(main_server) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/drbqs/queue.rb', line 86 def get_result(main_server) count = 0 begin loop do get_accept_signal sym, task_id, node_id, result = @result.take([:result, Fixnum, Fixnum, nil], 0) count += 1 @history.set(task_id, :result, node_id) @logger.info("Get: result of #{task_id} from node #{node_id}.") if @logger delete_task_id(node_id, task_id) exec_task_hook(main_server, task_id, result) end rescue Rinda::RequestExpiredError @logger.debug("Get: #{count} results.") if @logger end count end |
#requeue_for_deleted_node_id(deleted) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/drbqs/queue.rb', line 52 def requeue_for_deleted_node_id(deleted) deleted.each do |node_id| if task_id_ary = @calculating[node_id] task_id_ary.each do |task_id| queue_task(task_id) @history.set(task_id, :requeue) @logger.info("Requeue: task #{task_id}.") if @logger end @calculating.delete(node_id) end end end |