Class: MessagePusher

Inherits:
Object
  • Object
show all
Includes:
MessageCommand
Defined in:
lib/pipeline_toolkit/message_pusher.rb

Instance Attribute Summary

Attributes included from MessageCommand

#sys_pipe

Instance Method Summary collapse

Methods included from MessageCommand

#ack_msg, #pass_on_msg, #process_line, #process_system, #shutdown, #start

Constructor Details

#initialize(opts) ⇒ MessagePusher



6
7
8
9
10
11
12
13
14
15
# File 'lib/pipeline_toolkit/message_pusher.rb', line 6

def initialize(opts)
  @key_eval = opts.key_eval
  if opts.key_file
    @key_file = opts.key_file
    load_route(@key_file)
    self.init_route
  end
  @exchange_names = opts.exchanges.map { |str| str.split(":") }
  @msg_server_config = opts.select_keys(:host, :port, :user, :pass, :vhost)
end

Instance Method Details

#classify(str) ⇒ Object

Turn path Class or Module name (i.e. strip directories and turn into camel-case)



36
37
38
# File 'lib/pipeline_toolkit/message_pusher.rb', line 36

def classify(str)
  str.gsub(/^.*\//, '').gsub(".rb","").gsub(/(?:^|_)(.)/) { $1.upcase }
end

#init_loopObject



17
18
19
20
# File 'lib/pipeline_toolkit/message_pusher.rb', line 17

def init_loop
  @msg_server = MQ.new(AMQP.connect(@msg_server_config))
  self.setup_exchanges
end

#load_route(key_file) ⇒ Object



30
31
32
33
# File 'lib/pipeline_toolkit/message_pusher.rb', line 30

def load_route(key_file)
  require key_file
  self.extend eval(classify(key_file.gsub(".rb", "")))
end

#process_message(msg) ⇒ Object



45
46
47
48
49
50
51
52
# File 'lib/pipeline_toolkit/message_pusher.rb', line 45

def process_message(msg)
  @exchanges.each do |exchange|
    key = route_key(msg)
    exchange.publish(msg.to_yaml, :routing_key => key)
    # OPTIMIZE. Using MessageCoder.encode(msg) instead of to_yaml is 2x faster. But won't be easy to debug. Worth it?
  end
  :ack
end

#route_key(msg) ⇒ Object

is overriden by included fork_file if specified



41
42
43
# File 'lib/pipeline_toolkit/message_pusher.rb', line 41

def route_key(msg)
  @key_eval ? eval(@key_eval) : nil
end

#setup_exchangesObject



22
23
24
25
26
27
28
# File 'lib/pipeline_toolkit/message_pusher.rb', line 22

def setup_exchanges
  @exchanges = []
  @exchange_names.each do |name, type|
    type ||= :fanout
    @exchanges << MQ::Exchange.new(@msg_server, type.to_sym, name, :durable => true, :passive => false)
  end
end