Class: Wakame::ActorRequest

Inherits:
Object
  • Object
show all
Defined in:
lib/wakame/master.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(master, packet) ⇒ ActorRequest

Returns a new instance of ActorRequest.

Raises:

  • (TypeError)


304
305
306
307
308
309
310
311
312
# File 'lib/wakame/master.rb', line 304

def initialize(master, packet)
  raise TypeError unless packet.is_a?(Wakame::Packets::ActorRequest)

  @master = master
  @packet = packet
  @requested = false
  @event_ticket = nil
  @wait_lock = ::Queue.new
end

Instance Attribute Details

#masterObject (readonly)

Returns the value of attribute master.



302
303
304
# File 'lib/wakame/master.rb', line 302

def master
  @master
end

Instance Method Details

#cancelObject

Raises:

  • (NotImplementedError)


343
344
345
346
347
348
349
# File 'lib/wakame/master.rb', line 343

def cancel
  check_requested?
  raise NotImplementedError
  
  #master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", Wakame::Packets::ActorCancel.new(@packet.agent_id, ).marshal)
  #ED.unsubscribe(@event_ticket)
end

#progressObject

Raises:

  • (NotImplementedError)


338
339
340
341
# File 'lib/wakame/master.rb', line 338

def progress
  check_requested?
  raise NotImplementedError
end

#requestObject



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/wakame/master.rb', line 315

def request
  raise "The request has already been sent." if @requested

  @event_ticket = ED.subscribe(Event::ActorComplete) { |event|
   if event.token == @packet.token
     
     # Any of status except RUNNING are accomplishment of the actor request.
     Wakame.log.debug("#{self.class}: The actor request has been completed: token=#{self.token}, status=#{event.status}")
     ED.unsubscribe(@event_ticket)
     @wait_lock.enq(event.status)
   end
  }
  Wakame.log.debug("#{self.class}: Send the actor request: #{@packet.path}@#{@packet.agent_id}, token=#{self.token}")
  master.publish_to('agent_command', "agent_id.#{@packet.agent_id}", @packet.marshal)
  @requested = true
  self
end

#tokenObject



334
335
336
# File 'lib/wakame/master.rb', line 334

def token
  @packet.token
end

#wait_completion(tout = 60*30) ⇒ Object Also known as: wait



351
352
353
354
355
356
357
358
359
360
361
# File 'lib/wakame/master.rb', line 351

def wait_completion(tout=60*30)
  check_requested?
  timeout(tout) {
    Wakame.log.debug("#{self.class}: Waiting a response from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}")
    ret_status = @wait_lock.deq
    Wakame.log.debug("#{self.class}: A response (status=#{ret_status}) back from the actor: #{@packet.path}@#{@packet.agent_id}, token=#{@packet.token}")
    if ret_status == Actor::STATUS_FAILED
      raise RuntimeError, "Failed status has been returned: Actor Request #{token}"
    end
  }
end