Class: Fluent::SQSOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_sqs.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



26
27
28
# File 'lib/fluent/plugin/out_sqs.rb', line 26

def configure(conf)
    super
end

#format(tag, time, record) ⇒ Object



50
51
52
53
54
55
56
# File 'lib/fluent/plugin/out_sqs.rb', line 50

def format(tag, time, record)
    if @include_tag then
        record[@tag_property_name] = tag
    end

    record.to_msgpack
end

#shutdownObject



46
47
48
# File 'lib/fluent/plugin/out_sqs.rb', line 46

def shutdown
    super
end

#startObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/out_sqs.rb', line 30

def start
    super

    AWS.config(
        :access_key_id => @aws_key_id,
        :secret_access_key => @aws_sec_key)

    @sqs = AWS::SQS.new(
        :sqs_endpoint => @sqs_endpoint)
    if @create_queue then
        @queue = @sqs.queues.create(@queue_name)
    else
        @queue = @sqs.queues.named(@queue_name)
    end
end

#write(chunk) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_sqs.rb', line 58

def write(chunk)
    batch_records = []
    batch_size = 0
    send_batches = [batch_records]

    chunk.msgpack_each do |record|
        body = Yajl.dump(record)
        batch_size += body.bytesize
        if batch_size > SQS_BATCH_SEND_MAX_SIZE ||
                batch_records.length >= SQS_BATCH_SEND_MAX_MSGS then
            batch_records = []
            batch_size = body.bytesize
            send_batches << batch_records
        end

        if batch_size > SQS_BATCH_SEND_MAX_SIZE then
            log.warn "Could not push message to SQS, payload exceeds "\
                "#{SQS_BATCH_SEND_MAX_SIZE} bytes.  "\
                "(Truncated message: #{body[0..200]})"
        else
            batch_records << { :message_body => body, :delay_seconds => @delay_seconds }
        end
    end

    until send_batches.length <= 0 do
        records = send_batches.shift
        until records.length <= 0 do
            @queue.batch_send(records.slice!(0..9))
        end
    end
end