Class: LogStash::Outputs::Pubsub::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/pubsub/client.rb

Overview

A wrapper around PubSub’s Java API.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(json_key_file, topic_name, batch_settings, logger, client = nil) ⇒ Client

Returns a new instance of Client.



11
12
13
14
15
# File 'lib/logstash/outputs/pubsub/client.rb', line 11

def initialize(json_key_file, topic_name, batch_settings, logger, client=nil)
  @logger = logger

  @pubsub = client || initialize_google_client(json_key_file, topic_name, batch_settings)
end

Class Method Details

.build_batch_settings(byte_threshold, delay_threshold_secs, count_threshold) ⇒ Object

Creates a Java BatchSettings object given user-defined thresholds.



18
19
20
21
22
23
24
# File 'lib/logstash/outputs/pubsub/client.rb', line 18

def self.build_batch_settings(byte_threshold, delay_threshold_secs, count_threshold)
  com.google.api.gax.batching.BatchingSettings.newBuilder
      .setElementCountThreshold(count_threshold)
      .setRequestByteThreshold(byte_threshold)
      .setDelayThreshold(org.threeten.bp.Duration.ofSeconds(delay_threshold_secs))
      .build
end

Instance Method Details

#build_message(message_string, attributes) ⇒ Object

Creates a Java PubsubMessage given the message body as a string and a string:string hash of attributes



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/logstash/outputs/pubsub/client.rb', line 28

def build_message(message_string, attributes)
  attributes ||= {}

  data = com.google.protobuf.ByteString.copyFromUtf8(message_string)
  builder = com.google.pubsub.v1.PubsubMessage.newBuilder
               .setData(data)

  attributes.each { |k, v| builder.putAttributes(k, v) }

  builder.build
end

#initialize_google_client(json_key_file, topic_name, batch_settings) ⇒ Object

Sets up the Google pubsub client. It’s unlikely this is needed out of initialize, but it’s left public for the purposes of mocking.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/logstash/outputs/pubsub/client.rb', line 51

def initialize_google_client(json_key_file, topic_name, batch_settings)
  @logger.info("Initializing Google API client on #{topic_name} key: #{json_key_file}")

  if use_default_credential? json_key_file
    credentials = com.google.cloud.pubsub.v1.TopicAdminSettings.defaultCredentialsProviderBuilder().build()
  else
    raise_key_file_error(json_key_file)

    key_file = java.io.FileInputStream.new(json_key_file)
    sac = com.google.auth.oauth2.ServiceAccountCredentials.fromStream(key_file)
    credentials = com.google.api.gax.core.FixedCredentialsProvider.create(sac)
  end

  com.google.cloud.pubsub.v1.Publisher.newBuilder(topic_name)
     .setCredentialsProvider(credentials)
     .setHeaderProvider(construct_headers)
     .setBatchingSettings(batch_settings)
     .build
end

#publish_message(message_string, attributes) ⇒ Object

Creates a PubsubMessage from the string and attributes then queues it up to be sent.



42
43
44
45
46
# File 'lib/logstash/outputs/pubsub/client.rb', line 42

def publish_message(message_string, attributes)
  message = build_message(message_string, attributes)
  messageIdFuture = @pubsub.publish(message)
  setup_callback(message_string, messageIdFuture)
end

#shutdownObject

Schedules immediate publishing of any outstanding messages and waits until all are processed.



73
74
75
# File 'lib/logstash/outputs/pubsub/client.rb', line 73

def shutdown
  @pubsub.shutdown
end