Class: Fluent::Plugin::RabbitMQInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_rabbitmq.rb

Instance Method Summary collapse

Constructor Details

#initializeRabbitMQInput

Returns a new instance of RabbitMQInput.



78
79
80
81
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 78

def initialize
  super
  require "bunny"
end

Instance Method Details

#configure(conf) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 83

def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  bunny_options = {}
  bunny_options[:host] = @host if @host
  bunny_options[:hosts] = @hosts if @hosts
  bunny_options[:port] = @port if @port
  bunny_options[:user] = @user if @user
  bunny_options[:pass] = @pass if @pass
  bunny_options[:vhost] = @vhost if @vhost
  bunny_options[:connection_timeout] = @connection_timeout if @connection_timeout
  bunny_options[:continuation_timeout] = @continuation_timeout if @continuation_timeout
  bunny_options[:automatically_recover] = @automatically_recover if @automatically_recover
  bunny_options[:network_recovery_interval] = @network_recovery_interval if @network_recovery_interval
  bunny_options[:recovery_attempts] = @recovery_attempts
  bunny_options[:auth_mechanism] = @auth_mechanism if @auth_mechanism
  bunny_options[:heartbeat] = @heartbeat if @heartbeat

  bunny_options[:tls] = @tls
  bunny_options[:tls_cert] = @tls_cert if @tls_cert
  bunny_options[:tls_key] = @tls_key if @tls_key
  bunny_options[:tls_ca_certificates] = @tls_ca_certificates if @tls_ca_certificates
  bunny_options[:verify_peer] = @verify_peer

  @parser = parser_create

  @routing_key ||= @tag
  @bunny = Bunny.new(bunny_options)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


163
164
165
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 163

def multi_workers_ready?
  true
end

#shutdownObject



167
168
169
170
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 167

def shutdown
  @bunny.close
  super
end

#startObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/fluent/plugin/in_rabbitmq.rb', line 115

def start
  super
  @bunny.start
  channel = @bunny.create_channel(nil, @consumer_pool_size)
  channel.prefetch(@prefetch_count) if @prefetch_count
  if @create_exchange
    exchange_options = {
        durable: @exchange_durable,
        auto_delete: @auto_delete
    }
    @bunny_exchange = Bunny::Exchange.new(channel, @exchange_type, @exchange, exchange_options)
    if @exchange_to_bind
      @bunny_exchange.bind(@exchange_to_bind, routing_key: @exchange_routing_key)
    end
  end
  queue_arguments = {}
  queue_arguments["x-message-ttl"] = @ttl if @ttl
  queue_arguments["x-queue-mode"] = @queue_mode if @queue_mode
  queue_arguments["x-queue-type"] = @queue_type if @queue_type
  queue = channel.queue(
    @queue,
    durable: @durable,
    exclusive: @exclusive,
    auto_delete: @auto_delete,
    arguments: queue_arguments
  )
  if @exchange
    queue.bind(@exchange, routing_key: @routing_key)
  end
  queue.subscribe(manual_ack: @manual_ack) do |delivery_info, properties, payload|
    @parser.parse(payload) do |time, record|
      time = if properties[:timestamp]
                Fluent::EventTime.from_time(properties[:timestamp])
             else
                time
             end
      if @include_headers
        record[@headers_key] = properties.headers
      end
      if @include_delivery_info
        record[@delivery_info_key] = delivery_info
      end
      router.emit(@tag, time, record)
    end
    channel.ack(delivery_info.delivery_tag) if @manual_ack
  end
end