Class: Hoze::PubSubSource
Instance Attribute Summary collapse
-
#max_tries ⇒ Object
readonly
Returns the value of attribute max_tries.
-
#subscription ⇒ Object
readonly
Returns the value of attribute subscription.
-
#topic ⇒ Object
readonly
Returns the value of attribute topic.
Attributes inherited from Source
Instance Method Summary collapse
-
#initialize(configuration) ⇒ PubSubSource
constructor
A new instance of PubSubSource.
- #listen(&block) ⇒ Object
- #push(payload, metadata) ⇒ Object
Constructor Details
#initialize(configuration) ⇒ PubSubSource
Returns a new instance of PubSubSource.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/hoze/pubsub/source.rb', line 11 def initialize configuration @engine = Google::Cloud::Pubsub.new( project: configuration.connector.project, credentials: configuration.connector.path_to_key ) @channel = configuration.channel @key = configuration.key @topic = ensure_topic @subscription = ensure_subscription @max_tries = configuration.max_tries end |
Instance Attribute Details
#max_tries ⇒ Object (readonly)
Returns the value of attribute max_tries.
9 10 11 |
# File 'lib/hoze/pubsub/source.rb', line 9 def max_tries @max_tries end |
#subscription ⇒ Object (readonly)
Returns the value of attribute subscription.
9 10 11 |
# File 'lib/hoze/pubsub/source.rb', line 9 def subscription @subscription end |
#topic ⇒ Object (readonly)
Returns the value of attribute topic.
9 10 11 |
# File 'lib/hoze/pubsub/source.rb', line 9 def topic @topic end |
Instance Method Details
#listen(&block) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/hoze/pubsub/source.rb', line 27 def listen &block raise HozeSourceError.new("Tryng to listen source but no key configured") if @key.nil? subscriber = @subscription.listen do || begin msg = Hoze::PubSubMessage.new(, self) yield msg rescue Exception => e puts "Exception: #{e.message}" raise # always reraise end end subscriber.start begin puts "Starts listening" while true do sleep 10 end rescue Interrupt puts "Interrupted, cleaning ok" ensure subscriber.stop.wait! end end |
#push(payload, metadata) ⇒ Object
51 52 53 |
# File 'lib/hoze/pubsub/source.rb', line 51 def push payload, @topic.publish_async payload, end |