Class: Hoze::PubSubSource

Inherits:
Source
  • Object
show all
Defined in:
lib/hoze/pubsub/source.rb

Instance Attribute Summary collapse

Attributes inherited from Source

#channel, #engine, #key

Instance Method Summary collapse

Constructor Details

#initialize(configuration) ⇒ PubSubSource

Returns a new instance of PubSubSource.



11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/hoze/pubsub/source.rb', line 11

def initialize configuration
  @engine = Google::Cloud::Pubsub.new(
    project: configuration.connector.project,
    credentials: configuration.connector.path_to_key
  )

  @channel = configuration.channel
  @key = configuration.key

  @topic = ensure_topic
  @subscription = ensure_subscription

  @max_tries = configuration.max_tries
end

Instance Attribute Details

#max_triesObject (readonly)

Returns the value of attribute max_tries.



9
10
11
# File 'lib/hoze/pubsub/source.rb', line 9

def max_tries
  @max_tries
end

#subscriptionObject (readonly)

Returns the value of attribute subscription.



9
10
11
# File 'lib/hoze/pubsub/source.rb', line 9

def subscription
  @subscription
end

#topicObject (readonly)

Returns the value of attribute topic.



9
10
11
# File 'lib/hoze/pubsub/source.rb', line 9

def topic
  @topic
end

Instance Method Details

#listen(&block) ⇒ Object

Raises:

  • (HozeSourceError)


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/hoze/pubsub/source.rb', line 27

def listen &block
  raise HozeSourceError.new("Tryng to listen source but no key configured") if @key.nil?
  subscriber = @subscription.listen do |received_message|
    begin
      msg = Hoze::PubSubMessage.new(received_message, self)
      yield msg
    rescue Exception => e
      puts "Exception: #{e.message}"
      raise # always reraise
    end
  end
  subscriber.start
  begin
    puts "Starts listening"
    while true do
      sleep 10
    end
  rescue Interrupt
    puts "Interrupted, cleaning ok"
  ensure
    subscriber.stop.wait!
  end
end

#push(payload, metadata) ⇒ Object



51
52
53
# File 'lib/hoze/pubsub/source.rb', line 51

def push payload, 
  @topic.publish_async payload, 
end