Class: CloudWatchLogger::Client::AWS_SDK::DeliveryThread
- Inherits:
-
Thread
- Object
- Thread
- CloudWatchLogger::Client::AWS_SDK::DeliveryThread
- Defined in:
- lib/cloudwatchlogger/client/aws_sdk/threaded.rb
Instance Method Summary collapse
- #connect!(opts = {}) ⇒ Object
-
#deliver(message) ⇒ Object
Pushes a message onto the internal queue.
-
#exit! ⇒ Object
Signals the queue that we’re exiting.
-
#initialize(credentials, log_group_name, log_stream_name, opts = {}) ⇒ DeliveryThread
constructor
A new instance of DeliveryThread.
- #log_event(message_object) ⇒ Object
Constructor Details
#initialize(credentials, log_group_name, log_stream_name, opts = {}) ⇒ DeliveryThread
Returns a new instance of DeliveryThread.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 38 def initialize(credentials, log_group_name, log_stream_name, opts = {}) opts[:open_timeout] = opts[:open_timeout] || 120 opts[:read_timeout] = opts[:read_timeout] || 120 @credentials = credentials @log_group_name = log_group_name @log_stream_name = log_stream_name @opts = opts @queue = Queue.new @exiting = false super do loop do connect!(opts) if @client.nil? = @queue.pop break if == :__delivery_thread_exit_signal__ begin event = { log_group_name: @log_group_name, log_stream_name: @log_stream_name, log_events: [log_event()] } event[:sequence_token] = @sequence_token if @sequence_token response = @client.put_log_events(event) unless response.rejected_log_events_info.nil? raise CloudWatchLogger::LogEventRejected end @sequence_token = response.next_sequence_token rescue Aws::CloudWatchLogs::Errors::InvalidSequenceTokenException => err @sequence_token = err..split(' ').last retry end end end at_exit do exit! join end end |
Instance Method Details
#connect!(opts = {}) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 92 def connect!(opts = {}) args = { http_open_timeout: opts[:open_timeout], http_read_timeout: opts[:read_timeout] } args[:logger] = @opts[:logger] if @opts[:logger] args[:region] = @opts[:region] if @opts[:region] args.merge!( @credentials.key?(:access_key_id) ? { access_key_id: @credentials[:access_key_id], secret_access_key: @credentials[:secret_access_key] } : {} ) @client = Aws::CloudWatchLogs::Client.new(args) begin @client.create_log_stream( log_group_name: @log_group_name, log_stream_name: @log_stream_name ) rescue Aws::CloudWatchLogs::Errors::ResourceNotFoundException @client.create_log_group( log_group_name: @log_group_name ) retry rescue Aws::CloudWatchLogs::Errors::ResourceAlreadyExistsException, Aws::CloudWatchLogs::Errors::AccessDeniedException end end |
#deliver(message) ⇒ Object
Pushes a message onto the internal queue
88 89 90 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 88 def deliver() @queue.push() end |
#exit! ⇒ Object
Signals the queue that we’re exiting
82 83 84 85 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 82 def exit! @exiting = true @queue.push :__delivery_thread_exit_signal__ end |
#log_event(message_object) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/cloudwatchlogger/client/aws_sdk/threaded.rb', line 114 def log_event() = (Time.now.utc.to_f.round(3) * 1000).to_i = if .is_a?(Hash) && i[epoch_time ].all?{ |s| .key?(s) } = [:epoch_time] = [:message] end { timestamp: , message: } end |