Module: PubSubTie::Publisher

Extended by:
Publisher
Included in:
Publisher
Defined in:
lib/pubsub_tie/publisher.rb

Instance Method Summary collapse

Instance Method Details

#batch(event_sym, messages, resource) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pubsub_tie/publisher.rb', line 39

def batch(event_sym, messages, resource)
  topic = @pubsub.
      topic(Events.full_name event_sym)
  messages.each do |data|
    message = augmented(data, event_sym)
    topic.publish_async(payload(validate_data(event_sym, message), resource),
                        publish_time: Time.now.utc) do |result|
      unless result.succeeded?
        Rails.logger.error(
            "Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}")
      end
    end
  end
  topic.async_publisher.stop.wait!
end

#configure(config) ⇒ Object



7
8
9
# File 'lib/pubsub_tie/publisher.rb', line 7

def configure(config)
  @pubsub = google_pubsub(config)
end

#google_pubsub(config) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/pubsub_tie/publisher.rb', line 11

def google_pubsub(config)
  keyfile = File.join(PubSubTie.app_root, 'config', config['keyfile'])
  creds = Google::Cloud::PubSub::Credentials.new keyfile

  Google::Cloud::PubSub.new(project_id: config['project_id'],
                            credentials: creds)
end

#publish(event_sym, data, resource) ⇒ Object

Publishes event data asynchronously to topic inferred from event_sym. Data is augmented with event_name and event_time and validated against loaded configuration



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/pubsub_tie/publisher.rb', line 24

def publish(event_sym, data, resource)
  message = augmented(data, event_sym)      

  @pubsub.
    topic(Events.full_name event_sym).
    # publish(message(payload, resource), publish_time: Time.now.utc)
    publish_async(payload(validate_data(event_sym, message), resource),
                  publish_time: Time.now.utc) do |result|
      unless result.succeeded?
        Rails.logger.error(
          "Failed to publish #{message} to #{event_sym} on #{resource} due to #{result.error}")
      end
    end
end