Class: Fluent::CloudPubSubOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_cloud_pubsub.rb

Constant Summary collapse

MAX_REQ_SIZE =

10 MB

10 * 1024 * 1024
MAX_MSGS_PER_REQ =
1000

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


24
25
26
27
28
29
30
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 24

def configure(conf)
  super

  raise Fluent::ConfigError, "'project' must be specified." unless @project
  raise Fluent::ConfigError, "'topic' must be specified." unless @topic
  raise Fluent::ConfigError, "'key' must be specified." unless @key
end

#format(tag, time, record) ⇒ Object



39
40
41
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 39

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#startObject



32
33
34
35
36
37
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 32

def start
  super

  pubsub = (Gcloud.new @project, @key).pubsub
  @client = pubsub.topic @topic
end

#write(chunk) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 43

def write(chunk)
  msgs = []
  msgs_size = 0

  chunk.msgpack_each do |tag, time, record|
    size = Yajl.dump(record).bytesize
    if msgs.length > 0 && (msgs_size + size > max_req_size || msgs.length + 1 > max_msgs_per_req)
      publish(msgs)
      msgs = []
      msgs_size = 0
    end
    msgs << record.to_json
    msgs_size += size
  end

  if msgs.length > 0
    publish(msgs)
  end
rescue
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
end