Class: Fluent::CloudPubSubOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CloudPubSubOutput
- Defined in:
- lib/fluent/plugin/out_cloud_pubsub.rb
Constant Summary collapse
- MAX_REQ_SIZE =
10 MB
10 * 1024 * 1024
- MAX_MSGS_PER_REQ =
1000
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 24 def configure(conf) super raise Fluent::ConfigError, "'project' must be specified." unless @project raise Fluent::ConfigError, "'topic' must be specified." unless @topic raise Fluent::ConfigError, "'key' must be specified." unless @key end |
#format(tag, time, record) ⇒ Object
39 40 41 |
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 39 def format(tag, time, record) [tag, time, record].to_msgpack end |
#start ⇒ Object
32 33 34 35 36 37 |
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 32 def start super pubsub = (Gcloud.new @project, @key).pubsub @client = pubsub.topic @topic end |
#write(chunk) ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/out_cloud_pubsub.rb', line 43 def write(chunk) msgs = [] msgs_size = 0 chunk.msgpack_each do |tag, time, record| size = Yajl.dump(record).bytesize if msgs.length > 0 && (msgs_size + size > max_req_size || msgs.length + 1 > max_msgs_per_req) publish(msgs) msgs = [] msgs_size = 0 end msgs << record.to_json msgs_size += size end if msgs.length > 0 publish(msgs) end rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace end |