Class: DRbQS::TaskClient

Inherits:
Object
  • Object
show all
Defined in:
lib/drbqs/task_client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(node_id, queue, result, logger = nil) ⇒ TaskClient

Returns a new instance of TaskClient.



5
6
7
8
9
10
11
12
13
# File 'lib/drbqs/task_client.rb', line 5

def initialize(node_id, queue, result, logger = nil)
  @node_id = node_id
  @queue = queue
  @result = result
  @calculating_task = nil
  @task_queue = Queue.new
  @result_queue = Queue.new
  @logger = logger
end

Instance Attribute Details

#calculating_taskObject (readonly)

Returns the value of attribute calculating_task.



3
4
5
# File 'lib/drbqs/task_client.rb', line 3

def calculating_task
  @calculating_task
end

#node_idObject (readonly)

Returns the value of attribute node_id.



3
4
5
# File 'lib/drbqs/task_client.rb', line 3

def node_id
  @node_id
end

Instance Method Details

#add_new_taskObject



49
50
51
52
53
54
55
56
57
58
# File 'lib/drbqs/task_client.rb', line 49

def add_new_task
  unless @calculating_task
    if ary = get_task
      task_id, obj, method_sym, args = ary
      @logger.info("Send accept signal: node #{@node_id} caluclating #{task_id}") if @logger
      @result.write([:accept, task_id, @node_id])
      queue_task(task_id, [obj, method_sym, args])
    end
  end
end

#calculating?Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/drbqs/task_client.rb', line 15

def calculating?
  !!@calculating_task
end

#dequeue_taskObject



37
38
39
# File 'lib/drbqs/task_client.rb', line 37

def dequeue_task
  @task_queue.deq
end

#dump_result_queueObject



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/drbqs/task_client.rb', line 73

def dump_result_queue
  results = []
  while !result_empty?
    results << dequeue_result
  end
  if results.size > 0
    Marshal.dump(results)
  else
    nil
  end
end

#get_taskObject



41
42
43
44
45
46
47
# File 'lib/drbqs/task_client.rb', line 41

def get_task
  begin
    @queue.take([Fixnum, nil, Symbol, nil], 0)
  rescue Rinda::RequestExpiredError
    nil
  end
end

#queue_result(result) ⇒ Object



69
70
71
# File 'lib/drbqs/task_client.rb', line 69

def queue_result(result)
  @result_queue.enq(result)
end

#queue_task(task_id, ary) ⇒ Object



32
33
34
35
# File 'lib/drbqs/task_client.rb', line 32

def queue_task(task_id, ary)
  @calculating_task = task_id
  @task_queue.enq(ary)
end

#result_empty?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/drbqs/task_client.rb', line 23

def result_empty?
  @result_queue.empty?
end

#send_resultObject



60
61
62
63
64
65
66
67
# File 'lib/drbqs/task_client.rb', line 60

def send_result
  if !result_empty?
    result = dequeue_result
    @logger.info("Send result: #{@calculating_task}") { result.inspect } if @logger
    @result.write([:result, @calculating_task, @node_id, result])
    @calculating_task = nil
  end
end

#task_empty?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/drbqs/task_client.rb', line 19

def task_empty?
  @task_queue.empty?
end