Class: Pione::Model::TaskWorkerBrokerModel

Inherits:
Rootage::Model show all
Defined in:
lib/pione/model/task-worker-broker-model.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Rootage::Model

#[], #[]=, #specified?, #specify, #to_hash

Constructor Details

#initializeTaskWorkerBrokerModel

Returns a new instance of TaskWorkerBrokerModel.



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/pione/model/task-worker-broker-model.rb', line 8

def initialize
  super

  @spawnings = 0                  # number of current spawning task worker
  @task_worker_lock = Monitor.new # lock for task worker table
  @task_workers = Array.new       # known task worker fronts
  @tuple_space_lock = Monitor.new
  @tuple_space = Hash.new         # known tuple space table

  self[:spawn_task_worker] = true
end

Instance Attribute Details

#task_workersObject

Returns the value of attribute task_workers.



4
5
6
# File 'lib/pione/model/task-worker-broker-model.rb', line 4

def task_workers
  @task_workers
end

#tuple_spaceObject

Returns the value of attribute tuple_space.



5
6
7
# File 'lib/pione/model/task-worker-broker-model.rb', line 5

def tuple_space
  @tuple_space
end

#tuple_space_lockObject (readonly)

lock for tuple space table



6
7
8
# File 'lib/pione/model/task-worker-broker-model.rb', line 6

def tuple_space_lock
  @tuple_space_lock
end

Instance Method Details

#add_tuple_space(tuple_space) ⇒ Object

Add the tuple space.



21
22
23
24
25
26
# File 'lib/pione/model/task-worker-broker-model.rb', line 21

def add_tuple_space(tuple_space)
  uuid = tuple_space.uuid

  # update tuple space table with the id
  @tuple_space_lock.synchronize {@tuple_space[uuid] = tuple_space}
end

#create_task_worker(tuple_space) ⇒ Object

Create a task worker for the tuple space. This method returns true if we suceeds to spawn the task worker, or returns false.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/pione/model/task-worker-broker-model.rb', line 30

def create_task_worker(tuple_space)
  res = true

  @task_worker_lock.synchronize do
    @spawnings += 1

    # spawn a new process of pione-task-worker command
    if self[:spawn_task_worker]
      # make task worker's parameters
      param = {
        :features => Global.features,
        :tuple_space_id => tuple_space.uuid
      }

      begin
        spawner = Command::PioneTaskWorker.spawn(self, param)
        @task_workers << spawner.child_front
        spawner.when_terminated {delete_task_worker(spawner.child_front)}
      rescue Command::SpawnError => e
        Log::Debug.system("Task worker broker agent failed to spawn a task worker: %s" % e.message)
        res = false
      end
    else
      @task_workers << Agent::TaskWorker.start(tuple_space, Global.expressional_features, @env)
    end

    @spawnings -= 1
  end

  return res
end

#delete_dead_task_workersObject

Delete all dead task workers.



67
68
69
70
71
72
73
# File 'lib/pione/model/task-worker-broker-model.rb', line 67

def delete_dead_task_workers
  @task_worker_lock.synchronize do
    @task_workers.delete_if do |worker|
      not(Util.ignore_exception {timeout(1) {worker.ping}})
    end
  end
end

#delete_dead_tuple_spacesObject

Delete all dead tuple spaces.



76
77
78
79
80
81
82
# File 'lib/pione/model/task-worker-broker-model.rb', line 76

def delete_dead_tuple_spaces
  @tuple_space_lock.synchronize do
    @tuple_space.delete_if do |_, space|
      not(Util.ignore_exception {timeout(1) {space.ping}})
    end
  end
end

#delete_task_worker(worker) ⇒ Object



62
63
64
# File 'lib/pione/model/task-worker-broker-model.rb', line 62

def delete_task_worker(worker)
  @task_worker_lock.synchronize {@task_workers.delete(worker)}
end

#excess_task_workersObject

Return excess number of task workers belong to the broker.



85
86
87
88
89
# File 'lib/pione/model/task-worker-broker-model.rb', line 85

def excess_task_workers
  @task_worker_lock.synchronize do
    @task_worker_size - @task_workers.size - @spawnings
  end
end

#get_tuple_space(tuple_space_id) ⇒ Object

Get the tuple space.



92
93
94
# File 'lib/pione/model/task-worker-broker-model.rb', line 92

def get_tuple_space(tuple_space_id)
  @tuple_space_lock.synchronize {@tuple_space[tuple_space_id]}
end

#quantityObject

Return number of task workers the broker manages.



97
98
99
# File 'lib/pione/model/task-worker-broker-model.rb', line 97

def quantity
  @task_worker_lock.synchronize {@task_workers.size}
end

#terminate_task_worker_if(&condition) ⇒ Object

Terminate first task worker that satisfies the condition.



102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/pione/model/task-worker-broker-model.rb', line 102

def terminate_task_worker_if(&condition)
  @task_worker_lock.synchronize do
    @task_workers.each do |worker|
      if condition.call(worker)
        worker.terminate
        @task_workers.delete(worker)
        return true
      end
    end
  end
  return false
end

#tuple_spacesObject

Return known tuple spaces.



116
117
118
# File 'lib/pione/model/task-worker-broker-model.rb', line 116

def tuple_spaces
  @tuple_space_lock.synchronize {@tuple_space.values}
end

#update_tuple_spaces(tuple_spaces) ⇒ Object

Update tuple space list.



121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/pione/model/task-worker-broker-model.rb', line 121

def update_tuple_spaces(tuple_spaces)
  @tuple_space_lock.synchronize do
    # clear and update tuple space list
    @tuple_space.clear
    tuple_spaces.each {|tuple_space| add_tuple_space(tuple_space)}

    Log::Debug.system do
      list = @tuple_space.values.map{|space| space.__drburi}
      "Task worker broker has updated tuple space table: %s" % [list]
    end
  end
end