Class: Igor::Master

Inherits:
Object
  • Object
show all
Defined in:
lib/igor/master.rb

Defined Under Namespace

Classes: Options

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Master

Returns a new instance of Master.



9
10
11
12
13
14
# File 'lib/igor/master.rb', line 9

def initialize(options={})
  @options = AngryHash[options]
  @options.env ||= {}

  @logger = @options.logger ||= $stderr
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



7
8
9
# File 'lib/igor/master.rb', line 7

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



7
8
9
# File 'lib/igor/master.rb', line 7

def options
  @options
end

Instance Method Details

#igorObject



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/igor/master.rb', line 20

def igor
  @igor ||= begin
             if !::File.exist? options.config_file
               abort "configuration #{options.config_file} not found"
             end

             igor, options = Igor::Builder.parse_file(self.options.config_file, opt_parser)
             self.options.merge! options
             igor
           end
end

#opt_parserObject



38
39
40
# File 'lib/igor/master.rb', line 38

def opt_parser
  @opt_parser ||= Options.new
end

#publish_response(response) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/igor/master.rb', line 77

def publish_response(response)

  if !@response_queue && options.response_queue?
    @response_queue = MQ.queue(options.response_queue)
  end

  if response && @response_queue
    @response_queue.publish(Yajl::Encoder.encode(response))

    # TODO 
    #kind,exchange_id = *response[0..1]
    #response.tapp(:app_response)
    #MQ.send(kind,exchange_id).publish(*response[2..-1])
  end
end

#runObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/igor/master.rb', line 43

def run
  logger.puts "#{self.class} starting up"
  igor # load igor so that its options are parsed if required

  options.tapp(:final_opts)

  cfg = options.amqp!.dup
  cfg.host ||= 'localhost'

  
  AMQP.start(cfg.to_normal_hash(:symbols)) do
    options.queues.each {|queue_name| subscribe_to(queue_name)}
  end
end

#startObject



16
17
18
# File 'lib/igor/master.rb', line 16

def start
  run
end

#subscribe_to(queue_name) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/igor/master.rb', line 58

def subscribe_to(queue_name)
  logger.puts "* getting queue '#{queue_name}'"
  q = MQ.queue(queue_name)

  logger.puts "* listening"

  q.subscribe {|header,body| 
    env = options.env.merge(
      'igor.errors'      => options.logger,
      'igor.amqp.header' => header,
      'igor.payload'     => body
    )

    igor.call(env).tap {|response|
      publish_response(response)
    }
  }
end