Class: Fluent::GcloudPubSub::Publisher
- Inherits:
-
Object
- Object
- Fluent::GcloudPubSub::Publisher
- Defined in:
- lib/fluent/plugin/gcloud_pubsub/client.rb
Instance Method Summary collapse
-
#initialize(project, key, autocreate_topic, metric_prefix) ⇒ Publisher
constructor
A new instance of Publisher.
- #publish(topic_name, messages, compress_batches = false) ⇒ Object
- #topic(topic_name) ⇒ Object
Constructor Details
#initialize(project, key, autocreate_topic, metric_prefix) ⇒ Publisher
Returns a new instance of Publisher.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 35 def initialize(project, key, autocreate_topic, metric_prefix) @pubsub = Google::Cloud::Pubsub.new project_id: project, credentials: key @autocreate_topic = autocreate_topic @topics = {} # rubocop:disable Layout/LineLength @compression_ratio = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compressed_size_per_original_size_ratio") do ::Prometheus::Client.registry.histogram( :"#{metric_prefix}_messages_compressed_size_per_original_size_ratio", "Compression ratio achieved on a batch of messages", {}, # We expect compression for even a single message to be typically # above 2x (0.5/50%), so bias the buckets towards the higher end # of the range. [0, 0.25, 0.5, 0.75, 0.85, 0.9, 0.95, 0.975, 1], ) end @compression_duration = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{metric_prefix}_messages_compression_duration_seconds") do ::Prometheus::Client.registry.histogram( :"#{metric_prefix}_messages_compression_duration_seconds", "Time taken to compress a batch of messages", {}, [0, 0.0001, 0.0005, 0.001, 0.01, 0.05, 0.1, 0.25, 0.5, 1], ) end # rubocop:enable Layout/LineLength end |
Instance Method Details
#publish(topic_name, messages, compress_batches = false) ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 77 def publish(topic_name, , compress_batches = false) if compress_batches topic(topic_name).publish(*(, topic_name)) else topic(topic_name).publish do |batch| .each do |m| batch.publish m., m.attributes end end end rescue Google::Cloud::UnavailableError, Google::Cloud::DeadlineExceededError, Google::Cloud::InternalError => e raise RetryableError, "Google api returns error:#{e.class} message:#{e}" end |
#topic(topic_name) ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/gcloud_pubsub/client.rb', line 66 def topic(topic_name) return @topics[topic_name] if @topics.key? topic_name client = @pubsub.topic topic_name client = @pubsub.create_topic topic_name if client.nil? && @autocreate_topic raise Error, "topic:#{topic_name} does not exist." if client.nil? @topics[topic_name] = client client end |