Class: Pione::Agent::TaskWorker

Inherits:
TupleSpaceClient show all
Defined in:
lib/pione/agent/task-worker.rb

Overview

TaskWorker is an agent to process tasks

Constant Summary

Constants included from Log::MessageLog

Log::MessageLog::MESSAGE_QUEUE

Instance Attribute Summary collapse

Attributes inherited from BasicAgent

#chain_threads

Instance Method Summary collapse

Methods inherited from TupleSpaceClient

#bye, #call_transition_method, #hello, #transit_to_terminate

Methods included from Log::MessageLog

#debug_message, #debug_message_begin, #debug_message_end, debug_mode, debug_mode=, debug_mode?, message, quiet_mode, quiet_mode=, quiet_mode?, #show, #user_message, #user_message_begin, #user_message_end

Methods included from TupleSpaceClientOperation

#base_location, #bye, #finalize, #hello, #notify_exception, #read, #take

Methods included from TupleSpace::TupleSpaceInterface

#process_log, #processing_error, #set_tuple_space, tuple_space_operation, #tuple_space_server, #with_process_log

Methods inherited from BasicAgent

agent_type, inherited, set_agent_type, #start, #start!, #states, #terminate, #terminated?, #transit, #wait_until, #wait_until_after, #wait_until_before, #wait_until_terminated

Methods included from StateTransitionSingletonMethod

#chain, #define_exception_handler, #define_transition, #exception_handler, #start, #transition_chain

Constructor Details

#initialize(tuple_space, features, env = nil) ⇒ TaskWorker

Returns a new instance of TaskWorker.

Parameters:



26
27
28
29
30
31
# File 'lib/pione/agent/task-worker.rb', line 26

def initialize(tuple_space, features, env=nil)
  super(tuple_space)
  @tuple_space = tuple_space
  @features = features
  @env = env || get_environment
end

Instance Attribute Details

#execution_threadObject (readonly)

Returns the value of attribute execution_thread.



12
13
14
# File 'lib/pione/agent/task-worker.rb', line 12

def execution_thread
  @execution_thread
end

#onceObject

the agent will be killed at task completion if true



13
14
15
# File 'lib/pione/agent/task-worker.rb', line 13

def once
  @once
end

#tuple_spaceObject (readonly)

instance methods



11
12
13
# File 'lib/pione/agent/task-worker.rb', line 11

def tuple_space
  @tuple_space
end

Instance Method Details

#get_environmentObject

Get a environment object from tuple space.



127
128
129
130
131
132
133
# File 'lib/pione/agent/task-worker.rb', line 127

def get_environment
  if env = read!(TupleSpace::EnvTuple.new)
    env.obj
  else
    raise TupleSpaceError.new("the tuple space is invalid because \"env\" tuple not found.")
  end
end

#make_engine(task) ⇒ Object

Make an engine from the task.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/pione/agent/task-worker.rb', line 136

def make_engine(task)
  param = {
    :tuple_space  => @tuple_space,
    :env          => @env,
    :package_id   => task.package_id,
    :rule_name    => task.rule_name,
    :inputs       => task.inputs,
    :param_set    => task.param_set,
    :domain_id    => task.domain_id,
    :caller_id    => task.caller_id,
    :request_from => @request_from,
    :session_id   => @session_id,
    :client_ui    => @client_ui
  }

  RuleEngine.make(param)
end

#spawn_child_task_worker(task) ⇒ Object

Spawn child task worker. This method repeats to create a child agent while rule execution thread is alive.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/pione/agent/task-worker.rb', line 156

def spawn_child_task_worker(task)
  child_agent = nil
  foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)

  # child worker loop
  while @execution_thread.alive? do
    if @execution_thread.status == "sleep"
      if child_agent.nil? or child_agent.terminated?
        # when there isn't active child agent
        child_agent = self.class.new(tuple_space_server, @features, @env)
        child_agent.once = true

        # make log record
        record = Log::CreateChildTaskWorkerProcessRecord.new.tap do |x|
          x.parent = uuid
          x.child = child_agent.uuid
        end

        # spawn child agent with logging
        with_process_log(record) do
          # turn background
          take!(foreground)
          # start child agent
          child_agent.start
        end

        # wait until the child agent completes the task
        child_agent.wait_until_terminated(nil)
      end
    else
      sleep 0.1 # FIXME : rewrite this sleep by more sophisticated way
    end
  end

  # turn foreground
  write(foreground) unless read!(foreground)
end

#transit_to_connection_error(e) ⇒ Object

Report the connection error.



118
119
120
# File 'lib/pione/agent/task-worker.rb', line 118

def transit_to_connection_error(e)
  Log::SystemLog.warn("task worker agent was disconnected from tuple space unexpectedly, goes to termination.")
end

#transit_to_execute_task(task) ⇒ Object

Execute the task.



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/pione/agent/task-worker.rb', line 90

def transit_to_execute_task(task)
  # setup rule engine
  engine = make_engine(task)

  # start the engine
  @execution_thread = Thread.new do
    engine.handle || terminate
  end

  # spawn child task worker if flow
  if engine.rule_definition.rule_type == "flow"
    spawn_child_task_worker(task)
  end

  # wait until the engine ends
  @execution_thread.join

  # go next transition
  return task
end

#transit_to_finalize_task(task) ⇒ Object

Finalize the task. This method will turn working flag off and background.



112
113
114
115
# File 'lib/pione/agent/task-worker.rb', line 112

def transit_to_finalize_task(task)
  take!(TupleSpace::WorkingTuple.new(task.domain_id, task.digest))
  take!(TupleSpace::ForegroundTuple.new(task.domain_id, task.digest))
end

#transit_to_initObject

transitions



57
58
59
60
61
62
# File 'lib/pione/agent/task-worker.rb', line 57

def transit_to_init
  @request_from = @tuple_space.attribute("request_from")
  @session_id = @tuple_space.attribute("session_id")
  @client_ui = @tuple_space.attribute("client_ui")
  super
end

#transit_to_init_task(task) ⇒ Object

Initialize the task.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/pione/agent/task-worker.rb', line 70

def transit_to_init_task(task)
  # make flag tuples
  working = TupleSpace::WorkingTuple.new(task.domain_id, task.digest)
  foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)

  if read!(working)
    # the task is working already, so we will dicard the task
    raise Restart.new
  else
    # turn foreground flag on
    write(working)
    write(foreground)
    # go next transition
    return task
  end
rescue Rinda::RedundantTupleError
  raise Restart.new
end

#transit_to_take_taskObject

Take a task and turn it to foreground.



65
66
67
# File 'lib/pione/agent/task-worker.rb', line 65

def transit_to_take_task
  return take(TupleSpace::TaskTuple.new(features: @features))
end