Class: Hanami::Events::Adapter::CloudPubsub Private
- Inherits:
-
Object
- Object
- Hanami::Events::Adapter::CloudPubsub
- Defined in:
- lib/hanami/events/adapter/cloud_pubsub.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Adapter for Google Cloud Pub/Sub
Instance Attribute Summary collapse
- #listeners ⇒ Object readonly private
- #middleware ⇒ Object readonly private
- #subscribers ⇒ Object readonly private
- #topic_registry ⇒ Object readonly private
Instance Method Summary collapse
-
#broadcast(name, input_payload, **message_opts) ⇒ Object
private
Brodcasts event to all subscribes.
- #flush_messages ⇒ Object private
-
#initialize(params) ⇒ CloudPubsub
constructor
private
A new instance of CloudPubsub.
-
#subscribe(name, id:, **subscriber_opts, &block) ⇒ Object
private
Subscribes block for selected event.
Constructor Details
#initialize(params) ⇒ CloudPubsub
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of CloudPubsub.
16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 16 def initialize(params) @pubsub = params[:pubsub] @logger = params[:logger] || Logger.new($stdout) @listen = params[:listen] || false @subscribers = Concurrent::Array.new @listeners = Concurrent::Array.new @serializer_type = params.fetch(:serializer, :json).to_sym @topic_registry = {} @mutex = Mutex.new @middleware = ::Hanami::Events::CloudPubsub.config.client_middleware end |
Instance Attribute Details
#listeners ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
14 15 16 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 14 def listeners @listeners end |
#middleware ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
14 15 16 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 14 def middleware @middleware end |
#subscribers ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
14 15 16 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 14 def subscribers @subscribers end |
#topic_registry ⇒ Object (readonly)
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
14 15 16 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 14 def topic_registry @topic_registry end |
Instance Method Details
#broadcast(name, input_payload, **message_opts) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Brodcasts event to all subscribes
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 32 def broadcast(name, input_payload, **) event_name = namespaced(name) topic = topic_for event_name serialized_payload = serializer.serialize(input_payload) attrs = { id: SecureRandom.uuid, event_name: event_name } middleware.invoke(serialized_payload, **attrs, **) do |payload, **opts| topic.publish_async(payload, **opts) do |result| msg = result..grpc.to_h if result.succeeded? logger.info "Published #{name.inspect} published", **msg else logger.warn "Failed to broadcast #{name.inspect} event", error: result.error, **msg # rubocop:disable Layout/LineLength end end end end |
#flush_messages ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
70 71 72 73 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 70 def pubs = topic_registry.values.map(&:async_publisher).compact pubs.each(&:stop).map(&:wait!) end |
#subscribe(name, id:, **subscriber_opts, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Subscribes block for selected event
57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/hanami/events/adapter/cloud_pubsub.rb', line 57 def subscribe(name, id:, **subscriber_opts, &block) event_name = namespaced(name) namespaced_id = namespaced(id) logger.debug("Subscribed listener \"#{id}\" for event \"#{event_name}\"") sub = Hanami::Events::CloudPubsub::Subscriber.new(event_name, block, logger) @subscribers << sub topic = topic_for event_name register_listener(event_name, topic, namespaced_id, subscriber_opts) end |