Class: DRbQS::QueueServer

Inherits:
Object
  • Object
show all
Includes:
HistoryUtils
Defined in:
lib/drbqs/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from HistoryUtils

time_to_string

Constructor Details

#initialize(queue, result, logger = nil) ⇒ QueueServer

Returns a new instance of QueueServer.



10
11
12
13
14
15
16
17
18
# File 'lib/drbqs/queue.rb', line 10

def initialize(queue, result, logger = nil)
  @queue = queue
  @result = result
  @task_id = 0
  @cache = {}
  @calculating = Hash.new { |hash, key| hash[key] = Array.new }
  @history = DRbQS::History.new
  @logger = logger
end

Instance Attribute Details

#calculatingObject (readonly)

Returns the value of attribute calculating.



8
9
10
# File 'lib/drbqs/queue.rb', line 8

def calculating
  @calculating
end

#historyObject (readonly)

Returns the value of attribute history.



8
9
10
# File 'lib/drbqs/queue.rb', line 8

def history
  @history
end

Instance Method Details

#add(task) ⇒ Object

&hook take two arguments: a QueueServer object and a result of task. Return task ID (for debug).



27
28
29
30
31
32
33
34
# File 'lib/drbqs/queue.rb', line 27

def add(task)
  @task_id += 1
  @logger.info("New task: #{@task_id}") if @logger
  @cache[@task_id] = task
  queue_task(@task_id)
  @history.set(@task_id, :add)
  @task_id
end

#all_logsObject



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/drbqs/queue.rb', line 121

def all_logs
  s = ''
  @history.each do |task_id, events|
    s << "Task #{task_id}\n"
    events.each do |ev|
      case ev[1]
      when :add, :requeue, :hook
        s << "  #{time_to_string(ev[0])}\t#{ev[1]}\n"
      when :calculate, :result
        s << "  #{time_to_string(ev[0])}\t#{ev[1]} (node #{ev[2]})\n"
      end
    end
  end
  s
end

#calculating_task_numberObject



104
105
106
# File 'lib/drbqs/queue.rb', line 104

def calculating_task_number
  @calculating.inject(0) { |s, key_val| s + key_val[1].size }
end

#empty?Boolean

If queue is empty, return true. Otherwise, false. Even if there are calculating tasks, the method can return true.

Returns:

  • (Boolean)


111
112
113
# File 'lib/drbqs/queue.rb', line 111

def empty?
  @cache.size - calculating_task_number == 0
end

#exec_task_hook(main_server, task_id, result) ⇒ Object



75
76
77
78
79
80
81
82
83
84
# File 'lib/drbqs/queue.rb', line 75

def exec_task_hook(main_server, task_id, result)
  if task = @cache.delete(task_id)
    if hook = task.hook
      @history.set(task_id, :hook)
      hook.call(main_server, result)
    end
  else
    @logger.error("Task #{task_id} is not cached.") if @logger
  end
end

#finished?Boolean

If there are no tasks in queue and calculating, return true. Otherwise, false.

Returns:

  • (Boolean)


117
118
119
# File 'lib/drbqs/queue.rb', line 117

def finished?
  @cache.size == 0
end

#get_accept_signalObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/drbqs/queue.rb', line 36

def get_accept_signal
  count = 0
  begin
    loop do
      sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0)
      count += 1
      @calculating[node_id] << task_id
      @history.set(task_id, :calculate, node_id)
      @logger.info("Accept: task #{task_id} by node #{node_id}.") if @logger
    end
  rescue Rinda::RequestExpiredError
    @logger.debug("Accept: #{count} signals.") if @logger
  end
  count
end

#get_result(main_server) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/drbqs/queue.rb', line 86

def get_result(main_server)
  count = 0
  begin
    loop do
      get_accept_signal
      sym, task_id, node_id, result = @result.take([:result, Fixnum, Fixnum, nil], 0)
      count += 1
      @history.set(task_id, :result, node_id)
      @logger.info("Get: result of #{task_id} from node #{node_id}.") if @logger
      delete_task_id(node_id, task_id)
      exec_task_hook(main_server, task_id, result)
    end
  rescue Rinda::RequestExpiredError
    @logger.debug("Get: #{count} results.") if @logger
  end
  count
end

#requeue_for_deleted_node_id(deleted) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/drbqs/queue.rb', line 52

def requeue_for_deleted_node_id(deleted)
  deleted.each do |node_id|
    if task_id_ary = @calculating[node_id]
      task_id_ary.each do |task_id|
        queue_task(task_id)
        @history.set(task_id, :requeue)
        @logger.info("Requeue: task #{task_id}.") if @logger
      end
      @calculating.delete(node_id)
    end
  end
end