Class: LogjamAgent::AMQPForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/logjam_agent/amqp_forwarder.rb

Constant Summary collapse

RETRY_AFTER =
10.seconds

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ AMQPForwarder

Returns a new instance of AMQPForwarder.



8
9
10
11
12
13
14
15
# File 'lib/logjam_agent/amqp_forwarder.rb', line 8

def initialize(*args)
  opts = args.extract_options!
  @app = args[0] || LogjamAgent.application_name
  @env = args[1] || LogjamAgent.environment_name
  @config = default_options(@app, @env).merge!(opts)
  @exchange = @bunny = nil
  ensure_bunny_gem_is_available
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



6
7
8
# File 'lib/logjam_agent/amqp_forwarder.rb', line 6

def app
  @app
end

#envObject (readonly)

Returns the value of attribute env.



6
7
8
# File 'lib/logjam_agent/amqp_forwarder.rb', line 6

def env
  @env
end

Instance Method Details

#default_options(app, env) ⇒ Object



17
18
19
20
21
22
23
24
25
# File 'lib/logjam_agent/amqp_forwarder.rb', line 17

def default_options(app, env)
  {
    :host                 => "localhost",
    :exchange             => "request-stream-#{app}-#{env}",
    :exchange_durable     => true,
    :exchange_auto_delete => false,
    :routing_key          => "logs.#{app}.#{env}"
  }
end

#forward(msg, options = {}) ⇒ Object

TODO: mutex!



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/logjam_agent/amqp_forwarder.rb', line 28

def forward(msg, options = {})
  return if paused? || LogjamAgent.disabled
  begin
    # $stderr.puts msg
    key = options[:routing_key] || @config[:routing_key]
    if engine = options[:engine]
      key += ".#{engine}"
    end
    exchange.publish(msg, :key => key, :persistent => false)
  rescue Exception => exception
    reraise_expectation_errors!
    pause(exception)
  end
end

#reset(exception = nil) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/logjam_agent/amqp_forwarder.rb', line 43

def reset(exception=nil)
  return unless @bunny
  begin
    if exception
      @bunny.__send__(:close_socket)
    else
      @bunny.stop
    end
  rescue Exception
    # if bunny throws an exception here, its not usable anymore anyway
  ensure
    @exchange = @bunny = nil
  end
end