Class: Fluent::SQSOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_sqs.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



27
28
29
# File 'lib/fluent/plugin/out_sqs.rb', line 27

def configure(conf)
    super
end

#format(tag, time, record) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_sqs.rb', line 58

def format(tag, time, record)
    if @include_tag then
        record[@tag_property_name] = tag
    end

    record.to_msgpack
end

#shutdownObject



54
55
56
# File 'lib/fluent/plugin/out_sqs.rb', line 54

def shutdown
    super
end

#startObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/fluent/plugin/out_sqs.rb', line 31

def start
    super

    AWS.config(
        :access_key_id => @aws_key_id,
        :secret_access_key => @aws_sec_key)

    @sqs_endpoint = @sqs_url.gsub(/https:\/\/(.*?)\/.*/, '\1') if @sqs_url and not @sqs_endpoint

    @sqs = AWS::SQS.new(
        :sqs_endpoint => @sqs_endpoint)

    if @create_queue and @queue_name then
        @queue = @sqs.queues.create(@queue_name)
    else
        if @sqs_url # retrive queue by url
          @queue = @sqs.queues[@sqs_url]
        else
          @queue = @sqs.queues.named(@queue_name)
        end
    end
end

#write(chunk) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_sqs.rb', line 66

def write(chunk)
    batch_records = []
    batch_size = 0
    send_batches = [batch_records]

    chunk.msgpack_each do |record|
        body = Yajl.dump(record)
        batch_size += body.bytesize
        if batch_size > SQS_BATCH_SEND_MAX_SIZE ||
                batch_records.length >= SQS_BATCH_SEND_MAX_MSGS then
            batch_records = []
            batch_size = body.bytesize
            send_batches << batch_records
        end

        if batch_size > SQS_BATCH_SEND_MAX_SIZE then
            log.warn "Could not push message to SQS, payload exceeds "\
                "#{SQS_BATCH_SEND_MAX_SIZE} bytes.  "\
                "(Truncated message: #{body[0..200]})"
        else
            batch_records << { :message_body => body, :delay_seconds => @delay_seconds }
        end
    end

    until send_batches.length <= 0 do
        records = send_batches.shift
        until records.length <= 0 do
            @queue.batch_send(records.slice!(0..9))
        end
    end
end