Class: Fluent::GsvsocPubSubOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_gsvsoc_pubsub.rb

Constant Summary collapse

Pubsub =
Google::Apis::PubsubV1

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


63
64
65
66
67
68
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 63

def configure(conf)
  super
  raise Fluent::ConfigError, "buffer_chunk_records_limit may not exceed 999" if @buffer_chunk_records_limit > 999
  raise Fluent::ConfigError, "'key' must be specified as /path/to/key.json (e.g. service_account .json file)" unless @key
  raise Fluent::ConfigError, "'topic' must be specified as projects/<project-name>/topics/<topic-name>" unless @topic
end

#format(tag, time, record) ⇒ Object



81
82
83
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 81

def format(tag, time, record)
  record.to_json
end

#publish(giw = 0, data, attributes) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 85

def publish(giw = 0, data, attributes)
  request = Pubsub::PublishRequest.new(messages: [])
  data.each do |d|
    request.messages << Pubsub::Message.new(data: d, attributes: attributes)
  end
  m = @client.publish_topic(@topic, request)
  log.info "messages count acks for group_#{giw}: ", m.message_ids.size
rescue => e
  log.error "error publishing record: ", :error=>$!.to_s
  log.error_backtrace
  raise e
end

#startObject



70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 70

def start
  super
  ENV['GOOGLE_APPLICATION_CREDENTIALS']=@key
  pubsub = Pubsub::PubsubService.new
  pubsub.authorization = Google::Auth.get_application_default([Pubsub::AUTH_PUBSUB])
  @client = pubsub
  @client.request_options.retries = 2
  @client.request_options.timeout_sec = 30
  @client.request_options.open_timeout_sec = 30
end

#write(chunk) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/fluent/plugin/out_gsvsoc_pubsub.rb', line 98

def write(chunk)
  messages = []
  chunk.msgpack_each { |m| messages << m }
  
  if messages.length > 0
    # attributes arrive as key:val,key:val,key:val
    attributes = Hash[@attrs.split(",").map {|str| str.split(":")}]
    log.info "messages attributes: ", attributes
    log.info "messages count: ", messages.count
    
    # the messages array is split into multiples of @buffer_chunk_records_limit
    gofmsgs = messages.each_slice(@buffer_chunk_records_limit).to_a

    gid = chunk.hash.abs
    log.info "messages size of group_#{gid}: ", gofmsgs.size
    
    # group of messages is published in parallel
    Parallel.each_with_index(gofmsgs, in_threads: @parallel_in_threads) do |data, i|
      log.info "messages count sent for group_#{gid}-#{i}-#{Parallel.worker_number}: ", data.size
      publish("#{gid}-#{i}-#{Parallel.worker_number}", data, attributes)
    end
  end
rescue => e
  log.error "unexpected error", :error=>$!.to_s
  log.error_backtrace
  raise e
end