Class: Smith::Commands::Firehose
Instance Attribute Summary
#options, #target
Instance Method Summary
collapse
#banner, #format_help, #initialize, #parse_options
Methods included from Logger
included
Instance Method Details
#correct_direction? ⇒ Boolean
39
40
41
|
# File 'lib/smith/commands/smithctl/firehose.rb', line 39
def correct_direction?
options[:direction] == 'deliver' || options[:direction] == 'publish'
end
|
#execute ⇒ Object
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/smith/commands/smithctl/firehose.rb', line 6
def execute
queue_name = target.first || '#'
AMQP::Channel.new(Smith.connection) do |channel,ok|
channel.topic('amq.rabbitmq.trace', :durable => true) do |exchange|
channel.queue('smith.firehose', :durable => true) do |queue|
if correct_direction?
routing_key = "#{options[:direction]}.#{Smith.config.smith.namespace}.#{queue_name}"
queue.bind(exchange, :routing_key => routing_key).subscribe do |m, payload|
acl_type_cache = AclTypeCache.instance
clazz = acl_type_cache.get_by_hash(m.['properties']['type'])
message = {options[:direction] => clazz.new.parse_from_string(payload)}
puts (options[:json_given]) ? message.to_json : message.inspect
end
else
responder.succeed("--direction must be either deliver or publish")
end
end
end
end
end
|
#options_spec ⇒ Object
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/smith/commands/smithctl/firehose.rb', line 28
def options_spec
banner %(Listens on the rabbitmq firehose for the named queue and prints decoded
message to stdout.
Be warned it can produce vast amounts of outpu. You _must_ run 'rabbitmqctl
trace_on' for this to work.), "<queue>"
opt :json, "return the JSON representation of the message", :short => :j
opt :direction, "Show messages that are leaving the broker [deliver|publish]", :short => :d, :type => :string, :default => 'deliver'
end
|