Class: CloudWatchLogger::Client::AWS_SDK::DeliveryThread

Inherits:
Thread
  • Object
show all
Defined in:
lib/cloudwatchlogger/client/aws_sdk/threaded.rb

Instance Method Summary collapse

Constructor Details

#initialize(credentials, log_group_name, log_stream_name, opts = {}) ⇒ DeliveryThread

Returns a new instance of DeliveryThread.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 37

def initialize(credentials, log_group_name, log_stream_name, opts={})
  @credentials, @log_group_name, @log_stream_name, @opts = credentials, log_group_name, log_stream_name, opts
  opts[:open_timeout] = opts[:open_timeout] || 120
  opts[:read_timeout] = opts[:read_timeout] || 120

  @queue = Queue.new
  @exiting = false

  super do
    loop do

      if @client.nil?
        connect! opts
      end

      msg = @queue.pop
      break if msg == :__delivery_thread_exit_signal__

      begin
        event = {
          log_group_name: @log_group_name,
          log_stream_name: @log_stream_name,
          log_events: [{
            timestamp: (Time.now.utc.to_f.round(3)*1000).to_i,
            message: msg
          }]
        }

        if token = @sequence_token
          event[:sequence_token] = token
        end
        response = @client.put_log_events(event)
        unless response.rejected_log_events_info.nil?
          raise CloudWatchLogger::LogEventRejected
        end
        @sequence_token = response.next_sequence_token
      rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => err
        @sequence_token = err.message.split(' ').last
        retry
      end
    end
  end

  at_exit {
    exit!
    join
  }
end

Instance Method Details

#connect!(opts = {}) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 97

def connect!(opts={})
  @client = Aws::CloudWatchLogs::Client.new(
    region: @opts[:region] || 'us-east-1',
    access_key_id: @credentials[:access_key_id],
    secret_access_key: @credentials[:secret_access_key],
    http_open_timeout: opts[:open_timeout],
    http_read_timeout: opts[:read_timeout]
  )
  begin
    @client.create_log_stream(
      log_group_name: @log_group_name,
      log_stream_name: @log_stream_name
    )
  rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException => err
    @client.create_log_group(
      log_group_name: @log_group_name
    )
    retry
  rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException => err
  end
end

#deliver(message) ⇒ Object

Pushes a message onto the internal queue



93
94
95
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 93

def deliver(message)
  @queue.push(message)
end

#exit!Object

Signals the queue that we’re exiting



87
88
89
90
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 87

def exit!
  @exiting = true
  @queue.push :__delivery_thread_exit_signal__
end