Class: Smith::Commands::Firehose

Inherits:
Smith::CommandBase show all
Defined in:
lib/smith/commands/smithctl/firehose.rb

Instance Attribute Summary

Attributes inherited from Smith::CommandBase

#options, #target

Instance Method Summary collapse

Methods inherited from Smith::CommandBase

#banner, #format_help, #initialize, #parse_options

Methods included from Logger

included

Constructor Details

This class inherits a constructor from Smith::CommandBase

Instance Method Details

#correct_direction?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/smith/commands/smithctl/firehose.rb', line 39

def correct_direction?
  options[:direction] == 'deliver' || options[:direction] == 'publish'
end

#executeObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/smith/commands/smithctl/firehose.rb', line 6

def execute
  queue_name = target.first || '#'
  AMQP::Channel.new(Smith.connection) do |channel,ok|
    channel.topic('amq.rabbitmq.trace', :durable => true) do |exchange|
      channel.queue('smith.firehose', :durable => true) do |queue|
        if correct_direction?
          routing_key = "#{options[:direction]}.#{Smith.config.smith.namespace}.#{queue_name}"

          queue.bind(exchange, :routing_key => routing_key).subscribe do |m, payload|
            acl_type_cache = AclTypeCache.instance
            clazz = acl_type_cache.get_by_hash(m.headers['properties']['type'])
            message = {options[:direction] => clazz.new.parse_from_string(payload)}
            puts (options[:json_given]) ? message.to_json : message.inspect
          end
        else
          responder.succeed("--direction must be either deliver or publish")
        end
      end
    end
  end
end

#options_specObject



28
29
30
31
32
33
34
35
36
37
# File 'lib/smith/commands/smithctl/firehose.rb', line 28

def options_spec
  banner %(Listens on the rabbitmq firehose for the named queue and prints decoded
  message to stdout.

  Be warned it can produce vast amounts of outpu. You _must_ run 'rabbitmqctl
  trace_on' for this to work.), "<queue>"

  opt    :json,       "return the JSON representation of the message", :short => :j
  opt    :direction,  "Show messages that are leaving the broker [deliver|publish]", :short => :d, :type => :string, :default => 'deliver'
end