Class: LogStash::Inputs::GooglePubSub
- Inherits:
-
Base
- Object
- Base
- LogStash::Inputs::GooglePubSub
- Defined in:
- lib/logstash/inputs/google_pubsub.rb
Overview
This is a github.com/elastic/logstash[Logstash] input plugin for cloud.google.com/pubsub/[Google Pub/Sub]. The plugin can subscribe to a topic and ingest messages.
The main motivation behind the development of this plugin was to ingest cloud.google.com/logging/[Stackdriver Logging] messages via the cloud.google.com/logging/docs/export/using_exported_logs[Exported Logs] feature of Stackdriver Logging.
Prerequisites
You must first create a Google Cloud Platform project and enable the the Google Pub/Sub API. If you intend to use the plugin ingest Stackdriver Logging messages, you must also enable the Stackdriver Logging API and configure log exporting to Pub/Sub. There is plentiful information on cloud.google.com/ to get started:
-
Google Cloud Platform Projects and cloud.google.com/docs/overview/[Overview]
-
Google Cloud Pub/Sub cloud.google.com/pubsub/[documentation]
-
Stackdriver Logging cloud.google.com/logging/[documentation]
Cloud Pub/Sub
Currently, this module requires you to create a ‘topic` manually and specify it in the logstash config file. You must also specify a `subscription`, but the plugin will attempt to create the pull-based `subscription` on its own.
All messages received from Pub/Sub will be converted to a logstash ‘event` and added to the processing pipeline queue. All Pub/Sub messages will be `acknowledged` and removed from the Pub/Sub `topic` (please see more about cloud.google.com/pubsub/overview#concepts)[Pub/Sub concepts].
It is generally assumed that incoming messages will be in JSON and added to the logstash ‘event` as-is. However, if a plain text message is received, the plugin will return the raw text in as `raw_message` in the logstash `event`.
Authentication
You have two options for authentication depending on where you run Logstash.
-
If you are running Logstash outside of Google Cloud Platform, then you will
need to create a Google Cloud Platform Service Account and specify the full path to the JSON private key file in your config. You must assign sufficient roles to the Service Account to create a subscription and to pull messages from the subscription. Learn more about GCP Service Accounts and IAM roles here:
- Google Cloud Platform IAM https://cloud.google.com/iam/[overview]
- Creating Service Accounts https://cloud.google.com/iam/docs/creating-managing-service-accounts[overview]
- Granting Roles https://cloud.google.com/iam/docs/granting-roles-to-service-accounts[overview]
-
If you are running Logstash on a Google Compute Engine instance, you may opt
to use Application Default Credentials. In this case, you will not need to specify a JSON private key file in your config.
Stackdriver Logging (optional)
If you intend to use the logstash plugin for Stackdriver Logging message ingestion, you must first manually set up the Export option to Cloud Pub/Sub and the manually create the ‘topic`. Please see the more detailed instructions at, cloud.google.com/logging/docs/export/using_exported_logs [Exported Logs] and ensure that the cloud.google.com/logging/docs/export/configure_export#manual-access-pubsub[necessary permissions] have also been manually configured.
Logging messages from Stackdriver Logging exported to Pub/Sub are received as JSON and converted to a logstash ‘event` as-is in cloud.google.com/logging/docs/export/using_exported_logs#log_entries_in_google_pubsub_topics[this format].
Sample Configuration
Below is a copy of the included ‘example.conf-tmpl` file that shows a basic configuration for this plugin.
- source,ruby
input {
google_pubsub { # Your GCP project id (name) project_id => "my-project-1234" # The topic name below is currently hard-coded in the plugin. You # must first create this topic by hand and ensure you are exporting # logging to this pubsub topic. topic => "logstash-input-dev" # The subscription name is customizeable. The plugin will attempt to # create the subscription (but use the hard-coded topic name above). subscription => "logstash-sub" # If you are running logstash within GCE, it will use # Application Default Credentials and use GCE's metadata # service to fetch tokens. However, if you are running logstash # outside of GCE, you will need to specify the service account's # JSON key file below. #json_key_file => "/home/erjohnso/pkey.json" }
} output { stdout { codec => rubydebug } }
Metadata and Attributes
The original Pub/Sub message is preserved in the special Logstash ‘[@metadata]` field so you can fetch:
-
Message attributes
-
The origiginal base64 data
-
Pub/Sub message ID for de-duplication
-
Publish time
You MUST extract any fields you want in a filter prior to the data being sent to an output because Logstash deletes ‘@metadata` fields otherwise.
See the PubsubMessage cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage[documentation] for a full description of the fields.
Example to get the message ID:
- source,ruby
input {…}
filter
mutate { add_field => { "messageId" => "%{[@metadata][pubsub_message][messageId]" } }
}
output {…}
-
Defined Under Namespace
Classes: MessageReceiver, SubscriberListener
Instance Method Summary collapse
Instance Method Details
#extract_metadata(java_message) ⇒ Object
280 281 282 283 284 285 286 287 |
# File 'lib/logstash/inputs/google_pubsub.rb', line 280 def () { data: .getData().toStringUtf8(), attributes: .getAttributesMap(), messageId: .getMessageId(), publishTime: Timestamps.toString(.getPublishTime()) } end |
#register ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/logstash/inputs/google_pubsub.rb', line 220 def register @logger.debug("Registering Google PubSub Input: project_id=#{@project_id}, topic=#{@topic}, subscription=#{@subscription}") @subscription_id = "projects/#{@project_id}/subscriptions/#{@subscription}" if @json_key_file @credentialsProvider = FixedCredentialsProvider.create( ServiceAccountCredentials.fromStream(java.io.FileInputStream.new(@json_key_file)) ) end @topic_name = ProjectTopicName.of(@project_id, @topic) @subscription_name = ProjectSubscriptionName.of(@project_id, @subscription) end |
#run(queue) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/logstash/inputs/google_pubsub.rb', line 237 def run(queue) # Attempt to create the subscription if @create_subscription @logger.debug("Creating subscription #{@subscription_id}") subscriptionAdminClient = SubscriptionAdminClient.create begin subscriptionAdminClient.createSubscription(@subscription_name, @topic_name, PushConfig.getDefaultInstance(), 0) rescue @logger.info("Subscription already exists") end end @logger.debug("Pulling messages from sub '#{@subscription_id}'") handler = MessageReceiver.new do || # handle incoming message, then ack/nack the received message data = .getData().toStringUtf8() @codec.decode(data) do |event| event.set("host", event.get("host") || @host) event.set("[@metadata][pubsub_message]", ()) if @include_metadata decorate(event) queue << event end end listener = SubscriberListener.new do |from, failure| @logger.error("#{failure}") raise failure end flowControlSettings = FlowControlSettings.newBuilder().setMaxOutstandingElementCount(@max_messages).build() executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build() subscriberBuilder = Subscriber.newBuilder(@subscription_name, handler) .setFlowControlSettings(flowControlSettings) .setExecutorProvider(executorProvider) .setParallelPullCount(1) if @credentialsProvider subscriberBuilder.setCredentialsProvider(@credentialsProvider) end @subscriber = subscriberBuilder.build() @subscriber.addListener(listener, MoreExecutors.directExecutor()) @subscriber.startAsync() @subscriber.awaitTerminated() end |
#stop ⇒ Object
233 234 235 |
# File 'lib/logstash/inputs/google_pubsub.rb', line 233 def stop @subscriber.stopAsync().awaitTerminated() if @subscriber != nil end |