Class: LogStash::Outputs::Pubsub::Client
- Inherits:
-
Object
- Object
- LogStash::Outputs::Pubsub::Client
- Defined in:
- lib/logstash/outputs/pubsub/client.rb
Overview
A wrapper around PubSub’s Java API.
Class Method Summary collapse
-
.build_batch_settings(byte_threshold, delay_threshold_secs, count_threshold) ⇒ Object
Creates a Java BatchSettings object given user-defined thresholds.
Instance Method Summary collapse
-
#build_message(message_string, attributes) ⇒ Object
Creates a Java PubsubMessage given the message body as a string and a string:string hash of attributes.
-
#initialize(json_key_file, topic_name, batch_settings, logger, client = nil) ⇒ Client
constructor
A new instance of Client.
-
#initialize_google_client(json_key_file, topic_name, batch_settings) ⇒ Object
Sets up the Google pubsub client.
-
#publish_message(message_string, attributes) ⇒ Object
Creates a PubsubMessage from the string and attributes then queues it up to be sent.
-
#shutdown ⇒ Object
Schedules immediate publishing of any outstanding messages and waits until all are processed.
Constructor Details
#initialize(json_key_file, topic_name, batch_settings, logger, client = nil) ⇒ Client
Returns a new instance of Client.
11 12 13 14 15 |
# File 'lib/logstash/outputs/pubsub/client.rb', line 11 def initialize(json_key_file, topic_name, batch_settings, logger, client=nil) @logger = logger @pubsub = client || initialize_google_client(json_key_file, topic_name, batch_settings) end |
Class Method Details
.build_batch_settings(byte_threshold, delay_threshold_secs, count_threshold) ⇒ Object
Creates a Java BatchSettings object given user-defined thresholds.
18 19 20 21 22 23 24 |
# File 'lib/logstash/outputs/pubsub/client.rb', line 18 def self.build_batch_settings(byte_threshold, delay_threshold_secs, count_threshold) com.google.api.gax.batching.BatchingSettings.newBuilder .setElementCountThreshold(count_threshold) .setRequestByteThreshold(byte_threshold) .setDelayThreshold(org.threeten.bp.Duration.ofSeconds(delay_threshold_secs)) .build end |
Instance Method Details
#build_message(message_string, attributes) ⇒ Object
Creates a Java PubsubMessage given the message body as a string and a string:string hash of attributes
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/logstash/outputs/pubsub/client.rb', line 28 def (, attributes) attributes ||= {} data = com.google.protobuf.ByteString.copyFromUtf8() builder = com.google.pubsub.v1.PubsubMessage.newBuilder .setData(data) attributes.each { |k, v| builder.putAttributes(k, v) } builder.build end |
#initialize_google_client(json_key_file, topic_name, batch_settings) ⇒ Object
Sets up the Google pubsub client. It’s unlikely this is needed out of initialize, but it’s left public for the purposes of mocking.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/logstash/outputs/pubsub/client.rb', line 51 def initialize_google_client(json_key_file, topic_name, batch_settings) @logger.info("Initializing Google API client on #{topic_name} key: #{json_key_file}") if use_default_credential? json_key_file credentials = com.google.cloud.pubsub.v1.TopicAdminSettings.defaultCredentialsProviderBuilder().build() else raise_key_file_error(json_key_file) key_file = java.io.FileInputStream.new(json_key_file) sac = com.google.auth.oauth2.ServiceAccountCredentials.fromStream(key_file) credentials = com.google.api.gax.core.FixedCredentialsProvider.create(sac) end com.google.cloud.pubsub.v1.Publisher.newBuilder(topic_name) .setCredentialsProvider(credentials) .setHeaderProvider(construct_headers) .setBatchingSettings(batch_settings) .build end |
#publish_message(message_string, attributes) ⇒ Object
Creates a PubsubMessage from the string and attributes then queues it up to be sent.
42 43 44 45 46 |
# File 'lib/logstash/outputs/pubsub/client.rb', line 42 def (, attributes) = (, attributes) = @pubsub.publish() setup_callback(, ) end |
#shutdown ⇒ Object
Schedules immediate publishing of any outstanding messages and waits until all are processed.
73 74 75 |
# File 'lib/logstash/outputs/pubsub/client.rb', line 73 def shutdown @pubsub.shutdown end |