Class: LogStash::Outputs::AzureServiceBus

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/outputs/azure_service_bus.rb

Instance Method Summary collapse

Instance Method Details

#multi_receive(events) ⇒ Object



49
50
51
52
53
# File 'lib/logstash/outputs/azure_service_bus.rb', line 49

def multi_receive(events)
  return if events.empty?

  send_events(events)
end

#post_messages(messages) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/logstash/outputs/azure_service_bus.rb', line 67

def post_messages(messages)
  response = @service_bus_conn.post('messages') do |req|
    req.body = JSON.generate(messages)
    req.headers = { 'Authorization' => "Bearer #{@access_token}", 'Content-Type' => 'application/vnd.microsoft.servicebus.json' }
  end
rescue StandardError => e
  # Hopefully we never make it here and "throw away" messages since we have an agressive retry strategy.
  @logger.error("Error while sending message(s) to Service Bus: #{e.inspect}")
else
  if response.status == 201
    @logger.debug("Sent #{messages.length} message(s) to Service Bus")
  else
    @logger.error("Error while sending message(s) to Service Bus: HTTP #{response.status}")
  end
end

#refresh_access_tokenObject



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/logstash/outputs/azure_service_bus.rb', line 83

def refresh_access_token
  @logger.info('Refreshing Azure access token...')
  begin
    response = Faraday.get('http://169.254.169.254/metadata/identity/oauth2/token', { 'api-version' => '2018-02-01', 'resource' => 'https://servicebus.azure.net/' }) do |req|
      req.headers = { 'Metadata' => 'true' }
      req.options.timeout = 10
    end
  rescue StandardError => e # We just catch everything and move on since @service_bus_conn will handle retries.
    @logger.error("Error while fetching access token: #{e.inpsect}")
  else
    if response.status == 200
      data = JSON.parse(response.body)
      @access_token = data['access_token']
      @logger.info('Successfully refreshed Azure access token')
    else
      @logger.error("Error while fetching access token: HTTP #{response.status}")
    end
  end
end

#registerObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/logstash/outputs/azure_service_bus.rb', line 15

def register
  service_bus_retry_options = {
    max: Float::MAX, # Essentially retries indefinitely
    interval: 1,
    interval_randomness: 0.5,
    backoff_factor: 2,
    exceptions: [Faraday::ConnectionFailed, Faraday::TimeoutError, Faraday::RetriableResponse, Faraday::SSLError],
    methods: [], # Empty -> all methods
    retry_statuses: [401, 403, 404, 410, 429, 500], # https://docs.microsoft.com/en-us/rest/api/servicebus/send-message-batch#response-codes
    retry_block: lambda do |env, _options, _retries, exception|
      if env.status.nil?
        @logger.warn("Problem (#{exception.inspect}) while sending message(s) to Service Bus. Retrying...")
      else
        @logger.warn("Problem (HTTP #{env.status}) while sending message(s) to Service Bus. Retrying...")
        if env.status == 401
          refresh_access_token
          env.request_headers['Authorization'] = "Bearer #{@access_token}"
        end
      end
    end,
    retry_if: lambda do |_env, _exc|
      true # Always retry
    end
  }
  @service_bus_conn = Faraday.new(
    url: "https://#{@service_bus_namespace}.servicebus.windows.net/#{@service_bus_entity}/",
    request: { timeout: 10 }
  ) do |conn|
    conn.request :retry, service_bus_retry_options
  end
  @access_token = ''
  refresh_access_token
end

#send_events(events) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
# File 'lib/logstash/outputs/azure_service_bus.rb', line 55

def send_events(events)
  messages = []
  events.each do |event|
    if @messageid_field.nil?
      messages.append({ 'Body' => JSON.generate(event.to_hash), 'BrokerProperties' => { 'ContentType' => 'application/json' } })
    else
      messages.append({ 'Body' => JSON.generate(event.to_hash), 'BrokerProperties' => { 'ContentType' => 'application/json', 'MessageId' => event.get(@messageid_field) } })
    end
  end
  post_messages(messages)
end