Class: Fluent::Plugin::RabbitMQOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_rabbitmq.rb

Instance Method Summary collapse

Constructor Details

#initializeRabbitMQOutput

Returns a new instance of RabbitMQOutput.



67
68
69
70
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 67

def initialize
  super
  require "bunny"
end

Instance Method Details

#configure(conf) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 72

def configure(conf)
  compat_parameters_convert(conf, :inject, :formatter, default_chunk_key: "time")

  super

  bunny_options = {}
  bunny_options[:host] = @host if @host
  bunny_options[:hosts] = @hosts if @hosts
  bunny_options[:port] = @port if @port
  bunny_options[:user] = @user if @user
  bunny_options[:pass] = @pass if @pass
  bunny_options[:vhost] = @vhost if @vhost
  bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout
  bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout
  bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover
  bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval
  bunny_options[:recovery_attempts] = @recovery_attempts
  bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism
  bunny_options[:heartbeat] = @heartbeat if @heartbeat
  bunny_options[:frame_max] = @frame_max if @frame_max

  bunny_options[:tls] = @tls
  bunny_options[:tls_cert] = @tls_cert if @tls_cert
  bunny_options[:tls_key] = @tls_key if @tls_key
  bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  bunny_options[:verify_peer] = @verify_peer

  @bunny = Bunny.new(bunny_options)

  @publish_options = {}
  @publish_options[:content_type] = @content_type if @content_type
  @publish_options[:content_encoding] = @content_encoding if @content_encoding
  @publish_options[:persistent] = @persistent if @persistent
  @publish_options[:mandatory] = @mandatory if @mandatory
  @publish_options[:expiration] = @expiration if @expiration
  @publish_options[:type] = @message_type if @message_type
  @publish_options[:priority] = @priority if @priority
  @publish_options[:app_id] = @app_id if @app_id

  @formatter = formatter_create(default_type: @type)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 114

def multi_workers_ready?
  true
end

#prefer_buffered_processingObject



118
119
120
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 118

def prefer_buffered_processing
  false
end

#process(tag, es) ⇒ Object



151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 151

def process(tag, es)
  es.each do |time, record|
    set_publish_options(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    buf = @formatter.format(tag, time, record)
    @bunny_exchange.publish(buf, @publish_options)
  end
end

#set_publish_options(tag, time, record) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 138

def set_publish_options(tag, time, record)
  @publish_options[:timestamp] = time.to_i if @timestamp

  if @exchange_type != "fanout"
    @publish_options[:routing_key] = @routing_key ? @routing_key : tag
  end

  if @id_key
    id = record[@id_key]
    @publish_options[:message_id] = id if id
  end
end

#shutdownObject



133
134
135
136
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 133

def shutdown
  @bunny.close
  super
end

#startObject



122
123
124
125
126
127
128
129
130
131
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 122

def start
  super
  @bunny.start
  @channel = @bunny.create_channel
  exchange_options = {
    durable: @exchange_durable,
    auto_delete: @exchange_auto_delete
  }
  @bunny_exchange = Bunny::Exchange.new(@channel, @exchange_type, @exchange, exchange_options)
end

#write(chunk) ⇒ Object



160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/out_rabbitmq.rb', line 160

def write(chunk)
  tag = chunk..tag

  chunk.each do |time, record|
    set_publish_options(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    buf = @formatter.format(tag, time, record)
    @bunny_exchange.publish(buf, @publish_options)
  end
end