Class: Wakame::MasterManagers::CommandQueue

Inherits:
Object
  • Object
show all
Includes:
Wakame::MasterManager
Defined in:
lib/wakame/master_managers/command_queue.rb

Defined Under Namespace

Classes: Adapter

Instance Attribute Summary

Attributes included from Wakame::MasterManager

#master

Instance Method Summary collapse

Methods included from Wakame::MasterManager

#reload, #start, #stop

Constructor Details

#initializeCommandQueue

Returns a new instance of CommandQueue.



12
13
14
15
16
17
18
# File 'lib/wakame/master_managers/command_queue.rb', line 12

def initialize()
  @queue = Queue.new
  @result_queue = Queue.new
  @statistics = {
    :total_command_count => 0
  }
end

Instance Method Details

#initObject



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/wakame/master_managers/command_queue.rb', line 20

def init
  @command_thread = Thread.new {
    Wakame.log.info("#{self.class}: Started command thread: #{Thread.current}")
    while cmd = @queue.deq
      begin
        unless cmd.kind_of?(Wakame::Command)
          Wakame.log.warn("#{self.class}: Incompatible type of object has been sent to ProcessCommand thread. #{cmd.class}")
          next
        end

        res = nil
        Wakame.log.debug("#{self.class}: Being processed the command: #{cmd.class}")
        res = cmd.run
        res
      rescue => e
        Wakame.log.error(e)
        res = e
      ensure
        @result_queue.enq(res)
      end
    end
  }

  cmdsv_uri = URI.parse(Wakame.config.http_command_server_uri)

  @thin_server = Thin::Server.new(cmdsv_uri.host, cmdsv_uri.port, Adapter.new(self))
  @thin_server.threaded = true
  @thin_server.start
end

#send_cmd(cmd) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/wakame/master_managers/command_queue.rb', line 55

def send_cmd(cmd)
  begin
    @queue.enq(cmd)

    ED.fire_event(Event::CommandReceived.new(cmd))

    return @result_queue.deq()
  rescue => e
    Wakame.log.error("#{self.class}:")
    Wakame.log.error(e)
  end
end

#terminateObject



50
51
52
53
# File 'lib/wakame/master_managers/command_queue.rb', line 50

def terminate
  @thin_server.stop
  @command_thread.kill
end