Class: CloudWatchLogger::Client::AWS_SDK::DeliveryThread

Inherits:
Thread
  • Object
show all
Defined in:
lib/cloudwatchlogger/client/aws_sdk/threaded.rb

Instance Method Summary collapse

Constructor Details

#initialize(credentials, log_group_name, log_stream_name, opts = {}) ⇒ DeliveryThread

Returns a new instance of DeliveryThread.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 38

def initialize(credentials, log_group_name, log_stream_name, opts = {})
  opts[:open_timeout] = opts[:open_timeout] || 120
  opts[:read_timeout] = opts[:read_timeout] || 120
  @credentials = credentials
  @log_group_name = log_group_name
  @log_stream_name = log_stream_name
  @opts = opts

  @queue = Queue.new
  @exiting = false

  super do
    loop do
      connect!(opts) if @client.nil?

      message_object = @queue.pop
      break if message_object == :__delivery_thread_exit_signal__

      begin
        event = {
          log_group_name: @log_group_name,
          log_stream_name: @log_stream_name,
          log_events: [log_event(message_object)]
        }
        event[:sequence_token] = @sequence_token if @sequence_token
        response = @client.put_log_events(event)
        unless response.rejected_log_events_info.nil?
          raise CloudWatchLogger::LogEventRejected
        end
        @sequence_token = response.next_sequence_token
      rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => err
        @sequence_token = err.message.split(' ').last
        retry
      end
    end
  end

  at_exit do
    exit!
    join
  end
end

Instance Method Details

#connect!(opts = {}) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 92

def connect!(opts = {})
  args = { http_open_timeout: opts[:open_timeout], http_read_timeout: opts[:read_timeout] }
  args[:logger] = @opts[:logger] if @opts[:logger]
  args[:region] = @opts[:region] if @opts[:region]
  args.merge!( @credentials.key?(:access_key_id) ? { access_key_id: @credentials[:access_key_id], secret_access_key: @credentials[:secret_access_key] } : {} )

  @client = Aws::CloudWatchLogs::Client.new(args)
  begin
    @client.create_log_stream(
      log_group_name: @log_group_name,
      log_stream_name: @log_stream_name
    )
  rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException
    @client.create_log_group(
      log_group_name: @log_group_name
    )
    retry
  rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException,
    Aws::CloudWatchLogs::Errors::AccessDeniedException
  end
end

#deliver(message) ⇒ Object

Pushes a message onto the internal queue



88
89
90
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 88

def deliver(message)
  @queue.push(message)
end

#exit!Object

Signals the queue that we’re exiting



82
83
84
85
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 82

def exit!
  @exiting = true
  @queue.push :__delivery_thread_exit_signal__
end

#log_event(message_object) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 114

def log_event(message_object)
  timestamp = (Time.now.utc.to_f.round(3) * 1000).to_i
  message = message_object

  if message_object.is_a?(Hash) && i[epoch_time message].all?{ |s| message_object.key?(s) }
    timestamp = message_object[:epoch_time]
    message = message_object[:message]
  end

  { timestamp: timestamp, message: message }
end