Class: Google::Cloud::PubSub::AsyncPublisher
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::AsyncPublisher
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/async_publisher.rb,
lib/google/cloud/pubsub/async_publisher/batch.rb
Overview
Used to publish multiple messages in batches to a topic. See Topic#async_publisher
Instance Attribute Summary collapse
-
#callback_threads ⇒ Numeric
readonly
The number of threads to handle the published messages' callbacks.
-
#interval ⇒ Numeric
readonly
The number of seconds to collect messages before the batch is published.
-
#max_bytes ⇒ Integer
readonly
The maximum size of messages to be collected before the batch is published.
-
#max_messages ⇒ Integer
readonly
The maximum number of messages to be collected before the batch is published.
-
#publish_threads ⇒ Numeric
readonly
The number of threads used to publish messages.
-
#topic_name ⇒ String
readonly
The name of the topic the messages are published to.
Instance Method Summary collapse
-
#enable_message_ordering! ⇒ Object
Enables message ordering for messages with ordering keys.
-
#flush ⇒ AsyncPublisher
Forces all messages in the current batch to be published immediately.
-
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been enabled.
-
#publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
Add a message to the async publisher to be published to the topic.
-
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
-
#started? ⇒ boolean
Whether the publisher has been started.
-
#stop ⇒ AsyncPublisher
Begins the process of stopping the publisher.
-
#stop!(timeout = nil) ⇒ AsyncPublisher
Stop this publisher and block until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until
timeoutseconds have passed. -
#stopped? ⇒ boolean
Whether the publisher has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncPublisher
Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until
timeoutseconds have passed.
Instance Attribute Details
#callback_threads ⇒ Numeric (readonly)
The number of threads to handle the published messages' callbacks. Default is 4.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def callback_threads @callback_threads end |
#interval ⇒ Numeric (readonly)
The number of seconds to collect messages before the batch is published. Default is 0.01.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB).
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def max_bytes @max_bytes end |
#max_messages ⇒ Integer (readonly)
The maximum number of messages to be collected before the batch is published. Default is 100.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def end |
#publish_threads ⇒ Numeric (readonly)
The number of threads used to publish messages. Default is 2.
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def publish_threads @publish_threads end |
#topic_name ⇒ String (readonly)
The name of the topic the messages are published to. In the form of "/projects/project-identifier/topics/topic-name".
59 60 61 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 59 def topic_name @topic_name end |
Instance Method Details
#enable_message_ordering! ⇒ Object
Enables message ordering for messages with ordering keys. When
enabled, messages published with the same ordering_key will be
delivered in the order they were published.
See #message_ordering?. See Topic#publish_async, Subscription#listen, and Message#ordering_key.
237 238 239 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 237 def synchronize { @ordered = true } end |
#flush ⇒ AsyncPublisher
Forces all messages in the current batch to be published immediately.
204 205 206 207 208 209 210 211 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 204 def flush synchronize do publish_batches! @cond.signal end self end |
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been
enabled. When enabled, messages published with the same ordering_key
will be delivered in the order they were published. When disabled,
messages may be delivered in any order.
See #enable_message_ordering!. See Topic#publish_async, Subscription#listen, and Message#ordering_key.
252 253 254 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 252 def synchronize { @ordered } end |
#publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See Topic#publish_async
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 118 def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback msg = Convert. data, attributes, ordering_key, extra_attrs synchronize do raise AsyncPublisherStopped if @stopped raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string batch = msg raise OrderingKeyError, batch.ordering_key if batch.canceled? batch_action = batch.add msg, callback if batch_action == :full publish_batches! elsif @published_at.nil? # Set initial time to now to start the background counter @published_at = Time.now end @cond.signal end nil end |
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
264 265 266 267 268 269 270 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 264 def resume_publish ordering_key synchronize do batch = resolve_batch_for_ordering_key ordering_key return if batch.nil? batch.resume! end end |
#started? ⇒ boolean
Whether the publisher has been started.
217 218 219 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 217 def started? !stopped? end |
#stop ⇒ AsyncPublisher
Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use #wait! to block until the publisher is fully stopped and all pending messages have been published.
147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 147 def stop synchronize do break if @stopped @stopped = true publish_batches! stop: true @cond.signal @publish_thread_pool.shutdown end self end |
#stop!(timeout = nil) ⇒ AsyncPublisher
194 195 196 197 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 194 def stop! timeout = nil stop wait! timeout end |
#stopped? ⇒ boolean
Whether the publisher has been stopped.
225 226 227 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 225 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncPublisher
172 173 174 175 176 177 178 179 180 181 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 172 def wait! timeout = nil synchronize do @publish_thread_pool.wait_for_termination timeout @callback_thread_pool.shutdown @callback_thread_pool.wait_for_termination timeout end self end |