Class: DRbQS::TaskClient
- Inherits:
-
Object
- Object
- DRbQS::TaskClient
- Defined in:
- lib/drbqs/task_client.rb
Instance Attribute Summary collapse
-
#calculating_task ⇒ Object
readonly
Returns the value of attribute calculating_task.
-
#node_id ⇒ Object
readonly
Returns the value of attribute node_id.
Instance Method Summary collapse
- #add_new_task ⇒ Object
- #calculating? ⇒ Boolean
- #dequeue_task ⇒ Object
- #dump_result_queue ⇒ Object
- #get_task ⇒ Object
-
#initialize(node_id, queue, result, logger = nil) ⇒ TaskClient
constructor
A new instance of TaskClient.
- #queue_result(result) ⇒ Object
- #queue_task(task_id, ary) ⇒ Object
- #result_empty? ⇒ Boolean
- #send_result ⇒ Object
- #task_empty? ⇒ Boolean
Constructor Details
#initialize(node_id, queue, result, logger = nil) ⇒ TaskClient
Returns a new instance of TaskClient.
5 6 7 8 9 10 11 12 13 |
# File 'lib/drbqs/task_client.rb', line 5 def initialize(node_id, queue, result, logger = nil) @node_id = node_id @queue = queue @result = result @calculating_task = nil @task_queue = Queue.new @result_queue = Queue.new @logger = logger end |
Instance Attribute Details
#calculating_task ⇒ Object (readonly)
Returns the value of attribute calculating_task.
3 4 5 |
# File 'lib/drbqs/task_client.rb', line 3 def calculating_task @calculating_task end |
#node_id ⇒ Object (readonly)
Returns the value of attribute node_id.
3 4 5 |
# File 'lib/drbqs/task_client.rb', line 3 def node_id @node_id end |
Instance Method Details
#add_new_task ⇒ Object
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/drbqs/task_client.rb', line 49 def add_new_task unless @calculating_task if ary = get_task task_id, obj, method_sym, args = ary @logger.info("Send accept signal: node #{@node_id} caluclating #{task_id}") if @logger @result.write([:accept, task_id, @node_id]) queue_task(task_id, [obj, method_sym, args]) end end end |
#calculating? ⇒ Boolean
15 16 17 |
# File 'lib/drbqs/task_client.rb', line 15 def calculating? !!@calculating_task end |
#dequeue_task ⇒ Object
37 38 39 |
# File 'lib/drbqs/task_client.rb', line 37 def dequeue_task @task_queue.deq end |
#dump_result_queue ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/drbqs/task_client.rb', line 73 def dump_result_queue results = [] while !result_empty? results << dequeue_result end if results.size > 0 Marshal.dump(results) else nil end end |
#get_task ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/drbqs/task_client.rb', line 41 def get_task begin @queue.take([Fixnum, nil, Symbol, nil], 0) rescue Rinda::RequestExpiredError nil end end |
#queue_result(result) ⇒ Object
69 70 71 |
# File 'lib/drbqs/task_client.rb', line 69 def queue_result(result) @result_queue.enq(result) end |
#queue_task(task_id, ary) ⇒ Object
32 33 34 35 |
# File 'lib/drbqs/task_client.rb', line 32 def queue_task(task_id, ary) @calculating_task = task_id @task_queue.enq(ary) end |
#result_empty? ⇒ Boolean
23 24 25 |
# File 'lib/drbqs/task_client.rb', line 23 def result_empty? @result_queue.empty? end |
#send_result ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/drbqs/task_client.rb', line 60 def send_result if !result_empty? result = dequeue_result @logger.info("Send result: #{@calculating_task}") { result.inspect } if @logger @result.write([:result, @calculating_task, @node_id, result]) @calculating_task = nil end end |
#task_empty? ⇒ Boolean
19 20 21 |
# File 'lib/drbqs/task_client.rb', line 19 def task_empty? @task_queue.empty? end |