Class: PubSubModelSync::ServiceGoogle

Inherits:
ServiceBase show all
Defined in:
lib/pub_sub_model_sync/service_google.rb

Constant Summary collapse

LISTEN_SETTINGS =
{ message_ordering: true }.freeze
PUBLISH_SETTINGS =
{}.freeze
TOPIC_SETTINGS =
{}.freeze
SUBSCRIPTION_SETTINGS =
{ message_ordering: true }.freeze

Constants inherited from ServiceBase

PubSubModelSync::ServiceBase::SERVICE_KEY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Base

config, debug?, log

Constructor Details

#initializeServiceGoogle

Returns a new instance of ServiceGoogle.



19
20
21
22
23
# File 'lib/pub_sub_model_sync/service_google.rb', line 19

def initialize
  @service = Google::Cloud::Pubsub.new(project: config.project,
                                       credentials: config.credentials)
  Array(config.topic_name || 'model_sync').each(&method(:init_topic))
end

Instance Attribute Details

#publish_topicsObject

Returns the value of attribute publish_topics.



17
18
19
# File 'lib/pub_sub_model_sync/service_google.rb', line 17

def publish_topics
  @publish_topics
end

#serviceObject

Returns the value of attribute service.



17
18
19
# File 'lib/pub_sub_model_sync/service_google.rb', line 17

def service
  @service
end

#subscribersObject

Returns the value of attribute subscribers.



17
18
19
# File 'lib/pub_sub_model_sync/service_google.rb', line 17

def subscribers
  @subscribers
end

#topicsObject

Returns the value of attribute topics.



17
18
19
# File 'lib/pub_sub_model_sync/service_google.rb', line 17

def topics
  @topics
end

Instance Method Details

#listen_messagesObject



25
26
27
28
29
30
31
32
# File 'lib/pub_sub_model_sync/service_google.rb', line 25

def listen_messages
  log('Listener starting...')
  @subscribers = subscribe_to_topics
  log('Listener started')
  sleep
  subscribers.each { |subscriber| subscriber.stop.wait! }
  log('Listener stopped')
end

#publish(payload) ⇒ Object

Parameters:



35
36
37
38
39
# File 'lib/pub_sub_model_sync/service_google.rb', line 35

def publish(payload)
  p_topic_names = Array(payload.headers[:topic_name] || config.default_topic_name)
  message_topics = p_topic_names.map(&method(:find_topic))
  message_topics.each { |topic| publish_to_topic(topic, payload) }
end

#stopObject



41
42
43
44
# File 'lib/pub_sub_model_sync/service_google.rb', line 41

def stop
  log('Listener stopping...')
  (subscribers || []).each(&:stop!)
end