Class: Fluent::SQSOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_sqs.rb

Instance Method Summary collapse

Instance Method Details

#clientObject



37
38
39
# File 'lib/fluent/plugin/out_sqs.rb', line 37

def client
  @client ||= Aws::SQS::Client.new
end

#configure(conf) ⇒ Object



27
28
29
30
31
32
33
34
35
# File 'lib/fluent/plugin/out_sqs.rb', line 27

def configure(conf)
  super

  Aws.config = {
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region
  }
end

#format(tag, _time, record) ⇒ Object



61
62
63
64
65
# File 'lib/fluent/plugin/out_sqs.rb', line 61

def format(tag, _time, record)
  record[@tag_property_name] = tag if @include_tag

  record.to_msgpack
end

#generate_idObject



100
101
102
# File 'lib/fluent/plugin/out_sqs.rb', line 100

def generate_id
  @tag_property_name + Time.now.to_i.to_s
end

#queueObject



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_sqs.rb', line 45

def queue
  return @queue if @queue

  @queue = if @create_queue && @queue_name
             resource.create_queue(queue_name: @queue_name)
           else
             @queue = if @sqs_url
                        resource.queue(@sqs_url)
                      else
                        resource.get_queue_by_name(queue_name: @queue_name)
                      end
           end

  @queue
end

#resourceObject



41
42
43
# File 'lib/fluent/plugin/out_sqs.rb', line 41

def resource
  @resource ||= Aws::SQS::Resource.new(client: client)
end

#write(chunk) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/out_sqs.rb', line 67

def write(chunk)
  batch_records = []
  batch_size = 0
  send_batches = [batch_records]

  chunk.msgpack_each do |record|
    body = Yajl.dump(record)
    batch_size += body.bytesize

    if batch_size > SQS_BATCH_SEND_MAX_SIZE ||
       batch_records.length >= SQS_BATCH_SEND_MAX_MSGS
      batch_records = []
      batch_size = body.bytesize
      send_batches << batch_records
    end

    if batch_size > SQS_BATCH_SEND_MAX_SIZE
      log.warn 'Could not push message to SQS, payload exceeds ' \
               "#{SQS_BATCH_SEND_MAX_SIZE} bytes.  " \
               "(Truncated message: #{body[0..200]})"
    else
      batch_records << { id: generate_id, message_body: body, delay_seconds: @delay_seconds }
    end
  end

  until send_batches.length <= 0
    records = send_batches.shift
    until records.length <= 0
      queue.send_messages(entries: records.slice!(0..9))
    end
  end
end