Class: Wakame::ActorRequest
- Inherits:
-
Object
- Object
- Wakame::ActorRequest
- Defined in:
- lib/wakame/master.rb
Instance Attribute Summary collapse
-
#master ⇒ Object
readonly
Returns the value of attribute master.
Instance Method Summary collapse
- #cancel ⇒ Object
-
#initialize(master, packet) ⇒ ActorRequest
constructor
A new instance of ActorRequest.
- #progress ⇒ Object
- #request ⇒ Object
- #token ⇒ Object
- #wait_completion(tout = 60*30) ⇒ Object (also: #wait)
Constructor Details
#initialize(master, packet) ⇒ ActorRequest
Returns a new instance of ActorRequest.
304 305 306 307 308 309 310 311 312 |
# File 'lib/wakame/master.rb', line 304 def initialize(master, packet) raise TypeError unless packet.is_a?(Wakame::Packets::ActorRequest) @master = master @packet = packet @requested = false @event_ticket = nil @wait_lock = ::Queue.new end |
Instance Attribute Details
#master ⇒ Object (readonly)
Returns the value of attribute master.
302 303 304 |
# File 'lib/wakame/master.rb', line 302 def master @master end |
Instance Method Details
#cancel ⇒ Object
343 344 345 346 347 348 349 |
# File 'lib/wakame/master.rb', line 343 def cancel check_requested? raise NotImplementedError #master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", Wakame::Packets::ActorCancel.new(@packet.agent_id, ).marshal) #ED.unsubscribe(@event_ticket) end |
#progress ⇒ Object
338 339 340 341 |
# File 'lib/wakame/master.rb', line 338 def progress check_requested? raise NotImplementedError end |
#request ⇒ Object
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/wakame/master.rb', line 315 def request raise "The request has already been sent." if @requested @event_ticket = ED.subscribe(Event::ActorComplete) { |event| if event.token == @packet.token # Any of status except RUNNING are accomplishment of the actor request. Wakame.log.debug("#{self.class}: The actor request has been completed: token=#{self.token}, status=#{event.status}") ED.unsubscribe(@event_ticket) @wait_lock.enq(event.status) end } Wakame.log.debug("#{self.class}: Send the actor request: #{@packet.path}@#{@packet.agent_id}, token=#{self.token}") master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", @packet.marshal) @requested = true self end |
#token ⇒ Object
334 335 336 |
# File 'lib/wakame/master.rb', line 334 def token @packet.token end |
#wait_completion(tout = 60*30) ⇒ Object Also known as: wait
351 352 353 354 355 356 357 358 359 360 361 |
# File 'lib/wakame/master.rb', line 351 def wait_completion(tout=60*30) check_requested? timeout(tout) { Wakame.log.debug("#{self.class}: Waiting a response from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}") ret_status = @wait_lock.deq Wakame.log.debug("#{self.class}: A response (status=#{ret_status}) back from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}") if ret_status == Actor::STATUS_FAILED raise RuntimeError, "Failed status has been returned: Actor Request #{token}" end } end |