Class: Fluent::Plugin::SQSOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_sqs.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Method Summary collapse

Instance Method Details

#clientObject



48
49
50
# File 'lib/fluent/plugin/out_sqs.rb', line 48

def client
  @client ||= Aws::SQS::Client.new
end

#configure(conf) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/out_sqs.rb', line 33

def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  if (!@queue_name.nil? && @queue_name.end_with?('.fifo')) || (!@sqs_url.nil? && @sqs_url.end_with?('.fifo'))
    raise Fluent::ConfigError, 'message_group_id parameter is required for FIFO queue' if @message_group_id.nil?
  end

  Aws.config = {
    access_key_id: @aws_key_id,
    secret_access_key: @aws_sec_key,
    region: @region
  }
end

#format(tag, time, record) ⇒ Object



72
73
74
75
76
77
# File 'lib/fluent/plugin/out_sqs.rb', line 72

def format(tag, time, record)
  record[@tag_property_name] = tag if @include_tag
  record = inject_values_to_record(tag, time, record)

  record.to_msgpack
end

#formatted_to_msgpack_binaryObject



79
80
81
# File 'lib/fluent/plugin/out_sqs.rb', line 79

def formatted_to_msgpack_binary
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/fluent/plugin/out_sqs.rb', line 83

def multi_workers_ready?
  true
end

#queueObject



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/out_sqs.rb', line 56

def queue
  return @queue if @queue

  @queue = if @create_queue && @queue_name
             resource.create_queue(queue_name: @queue_name)
           else
             @queue = if @sqs_url
                        resource.queue(@sqs_url)
                      else
                        resource.get_queue_by_name(queue_name: @queue_name)
                      end
           end

  @queue
end

#resourceObject



52
53
54
# File 'lib/fluent/plugin/out_sqs.rb', line 52

def resource
  @resource ||= Aws::SQS::Resource.new(client: client)
end

#write(chunk) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/out_sqs.rb', line 87

def write(chunk)
  batch_records = []
  batch_size = 0
  send_batches = [batch_records]

  chunk.msgpack_each do |record|
    body = Yajl.dump(record)
    batch_size += body.bytesize

    if batch_size > SQS_BATCH_SEND_MAX_SIZE ||
       batch_records.length >= SQS_BATCH_SEND_MAX_MSGS
      batch_records = []
      batch_size = body.bytesize
      send_batches << batch_records
    end

    if batch_size > SQS_BATCH_SEND_MAX_SIZE
      log.warn 'Could not push message to SQS, payload exceeds ' \
               "#{SQS_BATCH_SEND_MAX_SIZE} bytes.  " \
               "(Truncated message: #{body[0..200]})"
    else
      id = "#{@tag_property_name}#{SecureRandom.hex(16)}"
      batch_record = { id: id, message_body: body, delay_seconds: @delay_seconds }
      batch_record[:message_group_id] = @message_group_id unless @message_group_id.nil?
      batch_records << batch_record
    end
  end

  until send_batches.length <= 0
    records = send_batches.shift
    until records.length <= 0
      queue.send_messages(entries: records.slice!(0..9))
    end
  end
end