Class: Fluent::CloudtrailInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_cloudtrail.rb

Constant Summary collapse

USER_AGENT_NAME =
'fluent-plugin-cloudtrail-in'
PLUGIN_VERSION =
'0.0.1'

Instance Method Summary collapse

Constructor Details

#initializeCloudtrailInput

Returns a new instance of CloudtrailInput.



42
43
44
# File 'lib/fluent/plugin/in_cloudtrail.rb', line 42

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



38
39
40
# File 'lib/fluent/plugin/in_cloudtrail.rb', line 38

def configure(conf)
  super
end

#load_clientsObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/in_cloudtrail.rb', line 59

def load_clients
  user_agent_suffix = "#{USER_AGENT_NAME}/#{PLUGIN_VERSION}"
  options = {
    user_agent_suffix: user_agent_suffix
  }
  if @region
    options[:region] = @region
  end

  if @aws_key_id && @aws_sec_key
    options.update(
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  elsif @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    options[:credentials] = credentials
  elsif @role_arn
    credentials = Aws::AssumeRoleCredentials.new(
      client: Aws::STS::Client.new(options),
      role_arn: @role_arn,
      role_session_name: "fluent-plugin-cloudtrail",
      external_id: @external_id,
      duration_seconds: 60 * 60,
    )
    options[:credentials] = credentials
  end

  if @debug
    options.update(
      logger: Logger.new(log.out),
      log_level: :debug
    )
    # XXX: Add the following options, if necessary
    # :http_wire_trace => true
  end

  if @http_proxy
    options[:http_proxy] = @http_proxy
  end

  @s3_client = Aws::S3::Client.new(options)
  @sqs_client = Aws::SQS::Client.new(options)
end

#run_periodicObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin/in_cloudtrail.rb', line 106

def run_periodic
  until @finished
    begin
      sleep @receive_interval
      sqs_resp = @sqs_client.receive_message(
        queue_url: @sqs_url,
        max_number_of_messages: @max_number_of_messages,
        wait_time_seconds: @wait_time_seconds
      )
      for message in sqs_resp.messages
        body_obj = JSON.parse(message.body)
        message_obj = JSON.parse(body_obj['Message'])
        s3_bucket = message_obj['s3Bucket']
        for s3_object_key in message_obj['s3ObjectKey']
          s3_resp = @s3_client.get_object(
            :bucket => s3_bucket,
            :key => s3_object_key
          )
          io = StringIO.new
          io.write s3_resp.body.read
          io.rewind
          gz = Zlib::GzipReader.new(io)
          cloudtrail_data = gz.read
          gz.close
          cloudtrail_records = JSON.parse(cloudtrail_data)['Records']
          for record in cloudtrail_records
            router.emit(@tag, Time.now.to_i, record)
          end
        end

        @sqs_client.delete_message(
          queue_url: @sqs_url,
          receipt_handle: message.receipt_handle
        )
      end
    rescue
      log.error "failed to emit", :error => $!.to_s, :error_class => $!.class.to_s
      log.warn_backtrace $!.backtrace
    end
  end
end

#shutdownObject



53
54
55
56
57
# File 'lib/fluent/plugin/in_cloudtrail.rb', line 53

def shutdown
  super
  @finished = true
  @thread.join
end

#startObject



46
47
48
49
50
51
# File 'lib/fluent/plugin/in_cloudtrail.rb', line 46

def start
  super
  load_clients
  @finished = false
  @thread = Thread.new(&method(:run_periodic))
end