Class: LogStash::Inputs::GooglePubSub

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/inputs/google_pubsub.rb

Overview

This is a github.com/elastic/logstash[Logstash] input plugin for cloud.google.com/pubsub/[Google Pub/Sub]. The plugin can subscribe to a topic and ingest messages.

The main motivation behind the development of this plugin was to ingest cloud.google.com/logging/[Stackdriver Logging] messages via the cloud.google.com/logging/docs/export/using_exported_logs[Exported Logs] feature of Stackdriver Logging.

Prerequisites

You must first create a Google Cloud Platform project and enable the the Google Pub/Sub API. If you intend to use the plugin ingest Stackdriver Logging messages, you must also enable the Stackdriver Logging API and configure log exporting to Pub/Sub. There is plentiful information on cloud.google.com/ to get started:

Cloud Pub/Sub

Currently, this module requires you to create a ‘topic` manually and specify it in the logstash config file. You must also specify a `subscription`, but the plugin will attempt to create the pull-based `subscription` on its own.

All messages received from Pub/Sub will be converted to a logstash ‘event` and added to the processing pipeline queue. All Pub/Sub messages will be `acknowledged` and removed from the Pub/Sub `topic` (please see more about cloud.google.com/pubsub/overview#concepts)[Pub/Sub concepts].

It is generally assumed that incoming messages will be in JSON and added to the logstash ‘event` as-is. However, if a plain text message is received, the plugin will return the raw text in as `raw_message` in the logstash `event`.

Authentication

You have two options for authentication depending on where you run Logstash.

  1. If you are running Logstash outside of Google Cloud Platform, then you will

need to create a Google Cloud Platform Service Account and specify the full path to the JSON private key file in your config. You must assign sufficient roles to the Service Account to create a subscription and to pull messages from the subscription. Learn more about GCP Service Accounts and IAM roles here:

- Google Cloud Platform IAM https://cloud.google.com/iam/[overview]
- Creating Service Accounts https://cloud.google.com/iam/docs/creating-managing-service-accounts[overview]
- Granting Roles https://cloud.google.com/iam/docs/granting-roles-to-service-accounts[overview]
  1. If you are running Logstash on a Google Compute Engine instance, you may opt

to use Application Default Credentials. In this case, you will not need to specify a JSON private key file in your config.

Stackdriver Logging (optional)

If you intend to use the logstash plugin for Stackdriver Logging message ingestion, you must first manually set up the Export option to Cloud Pub/Sub and the manually create the ‘topic`. Please see the more detailed instructions at, cloud.google.com/logging/docs/export/using_exported_logs [Exported Logs] and ensure that the cloud.google.com/logging/docs/export/configure_export#manual-access-pubsub[necessary permissions] have also been manually configured.

Logging messages from Stackdriver Logging exported to Pub/Sub are received as JSON and converted to a logstash ‘event` as-is in cloud.google.com/logging/docs/export/using_exported_logs#log_entries_in_google_pubsub_topics[this format].

Sample Configuration

Below is a copy of the included ‘example.conf-tmpl` file that shows a basic configuration for this plugin.

source,ruby

input {

google_pubsub {
    # Your GCP project id (name)
    project_id => "my-project-1234"

    # The topic name below is currently hard-coded in the plugin. You
    # must first create this topic by hand and ensure you are exporting
    # logging to this pubsub topic.
    topic => "logstash-input-dev"

    # The subscription name is customizeable. The plugin will attempt to
    # create the subscription (but use the hard-coded topic name above).
    subscription => "logstash-sub"

    # If you are running logstash within GCE, it will use
    # Application Default Credentials and use GCE's metadata
    # service to fetch tokens.  However, if you are running logstash
    # outside of GCE, you will need to specify the service account's
    # JSON key file below.
    #json_key_file => "/home/erjohnso/pkey.json"
}

} output { stdout { codec => rubydebug } }


Metadata and Attributes

The original Pub/Sub message is preserved in the special Logstash ‘[@metadata]` field so you can fetch:

  • Message attributes

  • The origiginal base64 data

  • Pub/Sub message ID for de-duplication

  • Publish time

You MUST extract any fields you want in a filter prior to the data being sent to an output because Logstash deletes ‘@metadata` fields otherwise.

See the PubsubMessage cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage[documentation] for a full description of the fields.

Example to get the message ID:

source,ruby

input {…}

filter

mutate {
  add_field => { "messageId" => "%{[@metadata][pubsub_message][messageId]" }
}

}

output {…}


Defined Under Namespace

Classes: MessageReceiver, SubscriberListener

Instance Method Summary collapse

Instance Method Details

#extract_metadata(java_message) ⇒ Object



280
281
282
283
284
285
286
287
# File 'lib/logstash/inputs/google_pubsub.rb', line 280

def (java_message)
  {
    data: java_message.getData().toStringUtf8(),
    attributes: java_message.getAttributesMap(),
    messageId: java_message.getMessageId(),
    publishTime: Timestamps.toString(java_message.getPublishTime())
  }
end

#registerObject



220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/logstash/inputs/google_pubsub.rb', line 220

def register
  @logger.debug("Registering Google PubSub Input: project_id=#{@project_id}, topic=#{@topic}, subscription=#{@subscription}")
  @subscription_id = "projects/#{@project_id}/subscriptions/#{@subscription}"

  if @json_key_file
    @credentialsProvider = FixedCredentialsProvider.create(
      ServiceAccountCredentials.fromStream(java.io.FileInputStream.new(@json_key_file))
    )
  end
  @topic_name = ProjectTopicName.of(@project_id, @topic)
  @subscription_name = ProjectSubscriptionName.of(@project_id, @subscription)
end

#run(queue) ⇒ Object



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/logstash/inputs/google_pubsub.rb', line 237

def run(queue)
  # Attempt to create the subscription
  if @create_subscription
    @logger.debug("Creating subscription #{@subscription_id}")
    subscriptionAdminClient = SubscriptionAdminClient.create
    begin
      subscriptionAdminClient.createSubscription(@subscription_name, @topic_name, PushConfig.getDefaultInstance(), 0)
    rescue
      @logger.info("Subscription already exists")
    end
  end

  @logger.debug("Pulling messages from sub '#{@subscription_id}'")
  handler = MessageReceiver.new do |message|
    # handle incoming message, then ack/nack the received message
    data = message.getData().toStringUtf8()
    @codec.decode(data) do |event|
      event.set("host", event.get("host") || @host)
      event.set("[@metadata][pubsub_message]", (message)) if @include_metadata
      decorate(event)
      queue << event
    end
  end
  listener = SubscriberListener.new do |from, failure|
    @logger.error("#{failure}")
    raise failure
  end
  flowControlSettings = FlowControlSettings.newBuilder().setMaxOutstandingElementCount(@max_messages).build()
  executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()
  subscriberBuilder = Subscriber.newBuilder(@subscription_name, handler)
    .setFlowControlSettings(flowControlSettings)
    .setExecutorProvider(executorProvider)
    .setParallelPullCount(1)

  if @credentialsProvider
    subscriberBuilder.setCredentialsProvider(@credentialsProvider)
  end
  @subscriber = subscriberBuilder.build()
  @subscriber.addListener(listener, MoreExecutors.directExecutor())
  @subscriber.startAsync()
  @subscriber.awaitTerminated()
end

#stopObject



233
234
235
# File 'lib/logstash/inputs/google_pubsub.rb', line 233

def stop
  @subscriber.stopAsync().awaitTerminated() if @subscriber != nil
end