Class: MessagePusher
Instance Attribute Summary
#sys_pipe
Instance Method Summary
collapse
#ack_msg, #pass_on_msg, #process_line, #process_system, #shutdown, #start
Constructor Details
6
7
8
9
10
11
12
13
14
15
|
# File 'lib/pipeline_toolkit/message_pusher.rb', line 6
def initialize(opts)
@key_eval = opts.key_eval
if opts.key_file
@key_file = opts.key_file
load_route(@key_file)
self.init_route
end
@exchange_names = opts.exchanges.map { |str| str.split(":") }
@msg_server_config = opts.select_keys(:host, :port, :user, :pass, :vhost)
end
|
Instance Method Details
#classify(str) ⇒ Object
Turn path Class or Module name (i.e. strip directories and turn into camel-case)
36
37
38
|
# File 'lib/pipeline_toolkit/message_pusher.rb', line 36
def classify(str)
str.gsub(/^.*\//, '').gsub(".rb","").gsub(/(?:^|_)(.)/) { $1.upcase }
end
|
#init_loop ⇒ Object
17
18
19
20
|
# File 'lib/pipeline_toolkit/message_pusher.rb', line 17
def init_loop
@msg_server = MQ.new(AMQP.connect(@msg_server_config))
self.setup_exchanges
end
|
#load_route(key_file) ⇒ Object
30
31
32
33
|
# File 'lib/pipeline_toolkit/message_pusher.rb', line 30
def load_route(key_file)
require key_file
self.extend eval(classify(key_file.gsub(".rb", "")))
end
|
#process_message(msg) ⇒ Object
45
46
47
48
49
50
51
52
|
# File 'lib/pipeline_toolkit/message_pusher.rb', line 45
def process_message(msg)
@exchanges.each do |exchange|
key = route_key(msg)
exchange.publish(msg.to_yaml, :routing_key => key)
end
:ack
end
|
#route_key(msg) ⇒ Object
is overriden by included fork_file if specified
41
42
43
|
# File 'lib/pipeline_toolkit/message_pusher.rb', line 41
def route_key(msg)
@key_eval ? eval(@key_eval) : nil
end
|
#setup_exchanges ⇒ Object
22
23
24
25
26
27
28
|
# File 'lib/pipeline_toolkit/message_pusher.rb', line 22
def setup_exchanges
@exchanges = []
@exchange_names.each do |name, type|
type ||= :fanout
@exchanges << MQ::Exchange.new(@msg_server, type.to_sym, name, :durable => true, :passive => false)
end
end
|