Class: Pione::Agent::BasicAgent

Inherits:
PioneObject
  • Object
show all
Extended by:
StateTransitionSingletonMethod
Includes:
DRbUndumped
Defined in:
lib/pione/agent/basic-agent.rb

Overview

BasicAgent is a super class for all PIONE agents.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from StateTransitionSingletonMethod

chain, define_exception_handler, define_transition, exception_handler, transition_chain

Constructor Details

#initializeBasicAgent

Returns a new instance of BasicAgent.



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/pione/agent/basic-agent.rb', line 115

def initialize
  @chain_threads = ThreadGroup.new

  # for wait_until_before method
  @__wait_until_before_mutex__ = Mutex.new
  @__wait_until_before_cv__ = Hash.new {|h, k| h[k] = ConditionVariable.new}

  # for wait_until_after method
  @__wait_until_after_mutex__ = Mutex.new
  @__wait_until_after_cv__ = Hash.new {|h, k| h[k] = ConditionVariable.new}
end

Instance Attribute Details

#chain_threadsObject (readonly)

transition chain thread group



113
114
115
# File 'lib/pione/agent/basic-agent.rb', line 113

def chain_threads
  @chain_threads
end

Class Method Details

.agent_typeObject

Return the agent type.



104
105
106
# File 'lib/pione/agent/basic-agent.rb', line 104

def self.agent_type
  @agent_type
end

.inherited(subclass) ⇒ Object

class methods



91
92
93
94
95
# File 'lib/pione/agent/basic-agent.rb', line 91

def self.inherited(subclass)
  subclass.instance_variable_set(:@transitions, @transitions.clone)
  subclass.instance_variable_set(:@transition_chain, @transition_chain.clone)
  subclass.instance_variable_set(:@exception_handler, @exception_handler.clone)
end

.set_agent_type(agent_type, klass = nil) ⇒ Object

Set the agent type.



98
99
100
101
# File 'lib/pione/agent/basic-agent.rb', line 98

def self.set_agent_type(agent_type, klass=nil)
  @agent_type = agent_type
  Agent.set_agent(klass) if klass
end

Instance Method Details

#startObject

Start the agent activity.



128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/pione/agent/basic-agent.rb', line 128

def start
  unless @chain_threads.list.empty?
    raise TerminationError.new(self, states)
  end

  # save current thread
  @__owner_thread__ = Thread.current

  # start a new chain thread
  @chain_threads.add(start_running(:init, [], AgentState.new, true))
  @chain_threads.enclose

  return self
end

#start!Object

Start the agent activity and wait the termination.



144
145
146
147
# File 'lib/pione/agent/basic-agent.rb', line 144

def start!
  start
  wait_until_terminated(nil)
end

#statesObject

Return agent states.



193
194
195
# File 'lib/pione/agent/basic-agent.rb', line 193

def states
  @chain_threads.list.map {|th| th[:agent_state]}
end

#terminateObject

Terminate the agent activity.



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/pione/agent/basic-agent.rb', line 234

def terminate
  state = nil

  Thread.new {
    # kill all chain threads
    @chain_threads.list.each do |thread|
      state = thread[:agent_state] # save last state
      unless thread == Thread.current
        thread.kill
        thread.join
      end
    end

    # fire "terminate" transtion
    begin
      Thread.current[:agent_state] = state || AgentState.new
      transit(:terminate, [])
    rescue DRb::DRbConnError, DRbPatch::ReplyReaderError => e
      Log::Debug.warn("raised a connection error when we terminated", self, e)
    end
  }.join
end

#terminated?Boolean

Return true if the agent has been terminated.

Returns:

  • (Boolean)


258
259
260
# File 'lib/pione/agent/basic-agent.rb', line 258

def terminated?
  return (@chain_threads.list.empty? and @chain_threads.enclosed?)
end

#transit(transition, transition_inputs) ⇒ Object

Fire the transtion with inputs.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/pione/agent/basic-agent.rb', line 150

def transit(transition, transition_inputs)
  # wake up threads that wait by wait_until_before method
  if @__wait_until_before_cv__.has_key?(transition)
    @__wait_until_before_mutex__.synchronize do
      @__wait_until_before_cv__[transition].broadcast
    end
  end

  # mark current transition
  Thread.current[:agent_state] =
    AgentState.new(previous: Thread.current[:agent_state].previous, current: transition)

  # call transition
  result = call_transition_method(transition, transition_inputs)
  result = result.nil? ? [] : result
  result = result.is_a?(Array) ? result : [result]

  # unmark current transition and mark previous transition
  Thread.current[:agent_state] = AgentState.new(previous: transition, current: nil)

  # wake up threads that wait by wait_until_after method
  if @__wait_until_after_cv__.has_key?(transition)
    @__wait_until_after_mutex__.synchronize do
      @__wait_until_after_cv__[transition].broadcast
    end
  end

  return transition, result
rescue StandardError => e
  # error handling
  if error_transition = get_exception_handler(e)
    raise unless error_transition.is_a?(Symbol)
    return transit(error_transition, [e])
  else
    if @__owner_thread and @__owner_thread__.alive?
      @__owner_thread__.raise e
    else
      raise e
    end
  end
end

#wait_until(transition, sec = 10) ⇒ Object



208
209
210
211
212
# File 'lib/pione/agent/basic-agent.rb', line 208

def wait_until(transition, sec=10)
  unless @chain_threads.list.any? {|th| th[:agent_state] and th[:agent_state].current == transition}
    wait_until_before(transition, sec)
  end
end

#wait_until_after(transition, sec = 10) ⇒ Object

Sleep until after the agent fires the transition.



215
216
217
218
219
220
221
222
223
# File 'lib/pione/agent/basic-agent.rb', line 215

def wait_until_after(transition, sec=10)
  timeout(sec) do
    @__wait_until_after_mutex__.synchronize do
      @__wait_until_after_cv__[transition].wait(@__wait_until_after_mutex__)
    end
  end
rescue Timeout::Error => e
  raise TimeoutError.new(self, @chain_threads.list.map{|th| th[:agent_state]}, sec)
end

#wait_until_before(transition, sec = 10) ⇒ Object

Sleep until before the agent fires the transition.



198
199
200
201
202
203
204
205
206
# File 'lib/pione/agent/basic-agent.rb', line 198

def wait_until_before(transition, sec=10)
  timeout(sec) do
    @__wait_until_before_mutex__.synchronize do
      @__wait_until_before_cv__[transition].wait(@__wait_until_before_mutex__)
    end
  end
rescue Timeout::Error
  raise TimeoutError.new(self, @chain_threads.list.map{|th| th[:agent_state]}, sec)
end

#wait_until_terminated(sec = 10) ⇒ Object

Sleep caller thread until the agent is terminated.



226
227
228
229
230
231
# File 'lib/pione/agent/basic-agent.rb', line 226

def wait_until_terminated(sec=10)
  unless terminated?
    wait_until_after(:terminate, sec)
    @chain_threads.list.each {|thread| thread.join}
  end
end