Class: Wrapbox::LogFetcher::Awslogs

Inherits:
Object
  • Object
show all
Defined in:
lib/wrapbox/log_fetcher/awslogs.rb

Constant Summary collapse

STOP_WAIT_TIMELIMIT =
10
COLOR_ESCAPE_SEQUENCES =
[33, 31, 32, 34, 35, 36]

Instance Method Summary collapse

Constructor Details

#initialize(log_group:, log_stream_prefix:, filter_pattern: nil, region: nil, access_key_id: nil, secret_access_key: nil, timestamp_format: "%Y-%m-%d %H:%M:%S.%3N", delay: 2, **options) ⇒ Awslogs

Returns a new instance of Awslogs.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/wrapbox/log_fetcher/awslogs.rb', line 6

def initialize(log_group:, log_stream_prefix:, filter_pattern: nil, region: nil, access_key_id: nil, secret_access_key: nil, timestamp_format: "%Y-%m-%d %H:%M:%S.%3N", delay: 2, **options)
  begin
    require 'aws-sdk-cloudwatchlogs'
  rescue LoadError
    $stderr.puts "Require aws-sdk-cloudwatchlogs gem"
    exit 1
  end

  @log_group = log_group
  @log_stream_prefix = log_stream_prefix
  @filter_pattern = filter_pattern
  @region = region
  @access_key_id = access_key_id
  @secret_access_key = secret_access_key
  @timestamp_format = timestamp_format
  @delay = delay
  @options = options.reject { |_, v| v.nil? }
  @displayed_log_stream_names = {}
  @displayed_log_stream_number = 0
  @displayed_event_ids = {}
end

Instance Method Details

#display_message(ev, output: $stdout) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/wrapbox/log_fetcher/awslogs.rb', line 76

def display_message(ev, output: $stdout)
  num = @displayed_log_stream_names.fetch(ev.log_stream_name) do |key|
    current = @displayed_log_stream_number
    @displayed_log_stream_names[key] = current
    @displayed_log_stream_number += 1
    current
  end

  sequence_number = COLOR_ESCAPE_SEQUENCES[num % COLOR_ESCAPE_SEQUENCES.length]

  time = Time.at(ev.timestamp / 1000.0)
  output.puts("\e[#{sequence_number}m#{time.strftime(@timestamp_format)} #{ev.log_stream_name}\e[0m #{ev.message}")
end

#main_loop(task) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/wrapbox/log_fetcher/awslogs.rb', line 39

def main_loop(task)
  task_id = task.task_arn.split("/").last
  log_stream_names = task.containers.map do |container|
    [@log_stream_prefix, container.name, task_id].join("/")
  end
  filter_log_opts = {
    log_group_name: @log_group,
    log_stream_names: log_stream_names,
    filter_pattern: @filter_pattern,
    interleaved: true,
  }.compact
  @max_timestamp = ((Time.now.to_f - 120) * 1000).round

  until @stop do
    filter_log_opts[:start_time] = @max_timestamp + 1
    resp = client.filter_log_events(filter_log_opts) rescue nil
    resp&.each do |r|
      r.events.each do |ev|
        next if @displayed_event_ids.member?(ev.event_id)
        display_message(ev)
        @displayed_event_ids[ev.event_id] = ev.timestamp
        @max_timestamp = ev.timestamp if @max_timestamp < ev.timestamp
      end
    end
    Thread.start do
      @displayed_event_ids.each do |event_id, ts|
        if ts < (Time.now.to_f - 600) * 1000
          @displayed_event_ids.delete(event_id)
        end
      end
    end.tap do
      sleep @delay 
    end.join
  end
end

#run(task:) ⇒ Object



28
29
30
31
32
# File 'lib/wrapbox/log_fetcher/awslogs.rb', line 28

def run(task:)
  @loop_thread = Thread.start do
    main_loop(task)
  end
end

#stopObject



34
35
36
37
# File 'lib/wrapbox/log_fetcher/awslogs.rb', line 34

def stop
  @stop = true
  @loop_thread&.join(STOP_WAIT_TIMELIMIT)
end