Class: DRbQS::Node::TaskClient

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/node/task_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new) ⇒ TaskClient

Returns a new instance of TaskClient.



6
7
8
9
10
11
12
13
14
# File 'lib/drbqs/node/task_client.rb', line 6

def initialize(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new)
  @node_number = node_number
  @queue = queue
  @result = result
  @task_queue = Queue.new
  @result_queue = Queue.new
  @group = group || []
  @logger = logger
end

Instance Attribute Details

#groupObject (readonly)

Returns the value of attribute group.



4
5
6
# File 'lib/drbqs/node/task_client.rb', line 4

def group
  @group
end

#node_numberObject (readonly)

Returns the value of attribute node_number.



4
5
6
# File 'lib/drbqs/node/task_client.rb', line 4

def node_number
  @node_number
end

Instance Method Details

#add_new_task(num) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/drbqs/node/task_client.rb', line 61

def add_new_task(num)
  get_task_id = []
  num.times do |i|
    if ary = get_task
      task_id = ary[0]
      @logger.info("Send accept signal: node #{@node_number} caluclating #{task_id}")
      @result.write([:accept, task_id, @node_number])
      queue_task(ary)
      get_task_id << task_id
    else
      break
    end
  end
  get_task_id.empty? ? nil : get_task_id
end

#dequeue_tasknil, Array

Returns If @task_queue is empty then return nil. Otherwise, an array [task_id, obj, method_name, args] is returned.

Returns:

  • (nil, Array)

    If @task_queue is empty then return nil. Otherwise, an array [task_id, obj, method_name, args] is returned.



36
37
38
39
40
41
42
# File 'lib/drbqs/node/task_client.rb', line 36

def dequeue_task
  if @task_queue.empty?
    nil
  else
    @task_queue.deq
  end
end

#dump_result_queueObject



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/drbqs/node/task_client.rb', line 93

def dump_result_queue
  results = []
  while !result_empty?
    task_id, res = dequeue_result
    results << res
  end
  if results.size > 0
    Marshal.dump(results)
  else
    nil
  end
end

#get_taskObject



52
53
54
55
56
57
58
59
# File 'lib/drbqs/node/task_client.rb', line 52

def get_task
  @group.each do |grp|
    if task = get_task_by_group(grp)
      return task
    end
  end
  get_task_by_group(DRbQS::Task::DEFAULT_GROUP)
end

#get_task_by_group(grp) ⇒ Object



44
45
46
47
48
49
50
# File 'lib/drbqs/node/task_client.rb', line 44

def get_task_by_group(grp)
  begin
    @queue.take([grp, Fixnum, nil, Symbol, nil], 0)[1..-1]
  rescue Rinda::RequestExpiredError
    nil
  end
end

#queue_result(task_id, result) ⇒ Object



89
90
91
# File 'lib/drbqs/node/task_client.rb', line 89

def queue_result(task_id, result)
  @result_queue.enq([task_id, result])
end

#queue_task(ary) ⇒ Object

Parameters:

  • ary (Array)

    An array is [task_id, obj, method_name, args]



30
31
32
# File 'lib/drbqs/node/task_client.rb', line 30

def queue_task(ary)
  @task_queue.enq(ary)
end

#result_empty?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/drbqs/node/task_client.rb', line 20

def result_empty?
  @result_queue.empty?
end

#send_resultObject

Return an array of task ID that is sent to the server.



78
79
80
81
82
83
84
85
86
87
# File 'lib/drbqs/node/task_client.rb', line 78

def send_result
  sent_task_id = []
  while !result_empty?
    task_id, result = dequeue_result
    @logger.info("Send result: #{task_id}") { result.inspect }
    @result.write([:result, task_id, @node_number, result])
    sent_task_id << task_id
  end
  sent_task_id.empty? ? nil : sent_task_id
end

#task_empty?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/drbqs/node/task_client.rb', line 16

def task_empty?
  @task_queue.empty?
end