Class: Fluent::GsvsocPubSubOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::GsvsocPubSubOutput
- Defined in:
- lib/fluent/plugin/out_gsvsoc_pubsub.rb
Constant Summary collapse
- Pubsub =
Google::Apis::PubsubV1
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #publish(giw = 0, data, attributes) ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 63 def configure(conf) super raise Fluent::ConfigError, "buffer_chunk_records_limit may not exceed 999" if @buffer_chunk_records_limit > 999 raise Fluent::ConfigError, "'key' must be specified as /path/to/key.json (e.g. service_account .json file)" unless @key raise Fluent::ConfigError, "'topic' must be specified as projects/<project-name>/topics/<topic-name>" unless @topic end |
#format(tag, time, record) ⇒ Object
81 82 83 |
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 81 def format(tag, time, record) record.to_json end |
#publish(giw = 0, data, attributes) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 85 def publish(giw = 0, data, attributes) request = Pubsub::PublishRequest.new(messages: []) data.each do |d| request. << Pubsub::Message.new(data: d, attributes: attributes) end m = @client.publish_topic(@topic, request) log.info "messages count acks for group_#{giw}: ", m..size rescue => e log.error "error publishing record: ", :error=>$!.to_s log.error_backtrace raise e end |
#start ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 70 def start super ENV['GOOGLE_APPLICATION_CREDENTIALS']=@key pubsub = Pubsub::PubsubService.new pubsub. = Google::Auth.get_application_default([Pubsub::AUTH_PUBSUB]) @client = pubsub @client..retries = 2 @client..timeout_sec = 30 @client..open_timeout_sec = 30 end |
#write(chunk) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 98 def write(chunk) = [] chunk.msgpack_each { |m| << m } if .length > 0 # attributes arrive as key:val,key:val,key:val attributes = Hash[@attrs.split(",").map {|str| str.split(":")}] log.info "messages attributes: ", attributes log.info "messages count: ", .count # the messages array is split into multiples of @buffer_chunk_records_limit gofmsgs = .each_slice(@buffer_chunk_records_limit).to_a gid = chunk.hash.abs log.info "messages size of group_#{gid}: ", gofmsgs.size # group of messages is published in parallel Parallel.each_with_index(gofmsgs, in_threads: @parallel_in_threads) do |data, i| log.info "messages count sent for group_#{gid}-#{i}-#{Parallel.worker_number}: ", data.size publish("#{gid}-#{i}-#{Parallel.worker_number}", data, attributes) end end rescue => e log.error "unexpected error", :error=>$!.to_s log.error_backtrace raise e end |