Class: Banter::Publisher

Inherits:
Object
  • Object
show all
Defined in:
lib/banter/publisher.rb

Constant Summary collapse

@@publisher =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(exchange = nil, exchange_type = :topic) ⇒ Publisher

Returns a new instance of Publisher.



20
21
22
23
24
25
26
# File 'lib/banter/publisher.rb', line 20

def initialize(exchange = nil, exchange_type = :topic)
  @exchange = exchange || Banter::Configuration.exchange_name
  @exchange_type = exchange_type
  @disabled = false
  @batch_messages = false
  @stack_depth = 0
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



4
5
6
# File 'lib/banter/publisher.rb', line 4

def channel
  @channel
end

#publisherObject (readonly)

Returns the value of attribute publisher.



3
4
5
# File 'lib/banter/publisher.rb', line 3

def publisher
  @publisher
end

Class Method Details

.instanceObject



8
9
10
11
12
13
# File 'lib/banter/publisher.rb', line 8

def self.instance
  if @@publisher.nil?
    @@publisher = ::Banter::Publisher.new
  end
  @@publisher
end

.teardownObject



15
16
17
18
# File 'lib/banter/publisher.rb', line 15

def self.teardown
  @@publisher.teardown if @publisher
  @@publisher = nil
end

Instance Method Details

#delay_messagesObject



33
34
35
36
37
38
# File 'lib/banter/publisher.rb', line 33

def delay_messages
  delay_start
  yield
ensure
  delay_execute
end

#enable(value) ⇒ Object



28
29
30
31
# File 'lib/banter/publisher.rb', line 28

def enable(value)
  @disabled = !value
  @disabled
end

#execute_publish(routing_key, envelope, use_default_exchange = false) ⇒ Object

Calls the ampq server with normalized data

Parameters:

  • routing_key (String)
    • routing_key that amqp server uses

  • envelope (Hash)
    • normalized data to publish



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/banter/publisher.rb', line 89

def execute_publish(routing_key, envelope, use_default_exchange=false)


  if @publisher.nil?
    start
  end

  if @disabled || @publisher.nil?
    Banter::RabbitLogger.failed_publish(routing_key, {}, envelope)
  else
    tries = 2
    begin
      instance = use_default_exchange ? @channel.default_exchange : @publisher
      instance.publish(envelope.to_json, :persistent => true, :mandatory => true, :timestamp => envelope[:ts], :content_type => "application/json", :routing_key => routing_key)
      Banter::RabbitLogger.log_publish(routing_key, envelope)
    # FIX!!! -thl
    # What kind of errors could be fired from a failure to publish?
    # Should we be more specific?
    # Docs only have errors while connecting, and really not during some sort of long running socket.  For now
    # We'll log until we get more info.
    rescue => e
      Banter::RabbitLogger.log(Logger::WARN, "Error occured on publish: #{e.message}: #{e.inspect}, #{routing_key}: #{envelope.inspect}")
      tries -= 1
      teardown
      start
      if tries > 0 && @publisher
        retry
      else
        Banter::RabbitLogger.failed_publish(routing_key, { error: e.message }, envelope)
      end
    end

  end

end

#publish(context, key, payload) ⇒ Object



66
67
68
69
70
71
72
73
74
75
# File 'lib/banter/publisher.rb', line 66

def publish(context, key, payload)
  routing_key = "#{@exchange}.#{key}"
  envelope    = ::Banter::Message.new.serialize(context, key, payload)

  if @batch_messages
    add_message(routing_key, envelope)
  else
    execute_publish(routing_key, envelope)
  end
end

#publish_to_queue(queue_name, envelope) ⇒ Object

Special method to publish to specific queues. This is useful for deadletter queue processing, where the dead letter will want to fire a message to a specific queue, as well at the consumer will want to inform the dead letter queue that it is finished processing a retry

Parameters:

  • queue_name (String)

    name to publish to (can be retrieved from headers[:queue] from dead letter exchange processor)

  • envelope (Hash)

    hash of contents that the amqp server returns



82
83
84
# File 'lib/banter/publisher.rb', line 82

def publish_to_queue(queue_name, envelope)
  execute_publish(queue_name, envelope, true)
end

#startObject



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/banter/publisher.rb', line 40

def start
  unless Configuration.push_enabled
    @disabled = true
    return
  end

  # grab server configuration from initialization file somewhere
  begin
    @connection = Bunny.new(Configuration.connection)
    @connection.start

    @channel   = @connection.create_channel
    @publisher = @channel.send(@exchange_type, @exchange, :durable => true, :auto_delete => false)

  rescue => e
    ::Banter::Notifier.notify(e, parameters: { message: e.message }, environment_name: ENV['RAILS_ENV'])
    return
  end

  @publisher.on_return do |return_info, properties, content|
    # contents are already transformed into message that we want to send
    Banter::RabbitLogger.failed_publish(return_info[:routing_key], properties, Hashie::Mash.new(::JSON.parse(content)))
  end

end