Module: Cloudenvoy::Subscriber

Defined in:
lib/cloudenvoy/subscriber.rb

Overview

Use this module to define subscribers. Subscribers must implement the message processsing logic in the process method.

E.g.

class UserSubscriber

include Cloudenvoy::Subscriber

# Specify subscription options
cloudenvoy_options topics: ['my-topic']

# Process message objects
def process(message)
  ...do something...
end

end

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.execute_from_descriptor(input_payload) ⇒ Any

Execute a subscriber from a payload object received from Pub/Sub.

the message to process.



68
69
70
71
72
# File 'lib/cloudenvoy/subscriber.rb', line 68

def self.execute_from_descriptor(input_payload)
  message = Message.from_descriptor(input_payload)
  subscriber = message.subscriber || raise(InvalidSubscriberError)
  subscriber.execute
end

.from_sub_uri(sub_uri) ⇒ Class

Return the subscriber class for the provided class name.



39
40
41
42
43
44
45
46
# File 'lib/cloudenvoy/subscriber.rb', line 39

def self.from_sub_uri(sub_uri)
  klass_name = Subscriber.parse_sub_uri(sub_uri)[0]

  # Check that subscriber class is a valid subscriber
  sub_klass = Object.const_get(klass_name.camelize)

  sub_klass.include?(self) ? sub_klass : nil
end

.included(base) ⇒ Object

Add class method to including class



23
24
25
26
27
28
29
# File 'lib/cloudenvoy/subscriber.rb', line 23

def self.included(base)
  base.extend(ClassMethods)
  base.attr_accessor :message, :process_started_at, :process_ended_at

  # Register subscriber
  Cloudenvoy.subscribers.add(base)
end

.parse_sub_uri(sub_uri) ⇒ Array<String,String>

Parse the subscription name and return the subscriber name and topic.



55
56
57
# File 'lib/cloudenvoy/subscriber.rb', line 55

def self.parse_sub_uri(sub_uri)
  sub_uri.split('/').last.split('.', 2).last.split('.', 2)
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.



193
194
195
# File 'lib/cloudenvoy/subscriber.rb', line 193

def ==(other)
  other.is_a?(self.class) && other.message == message
end

#executeAny

Execute the subscriber’s logic.



172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/cloudenvoy/subscriber.rb', line 172

def execute
  logger.info('Processing message...')

  # Process message
  resp = execute_middleware_chain

  # Log processing completion and return result
  logger.info("Processing done after #{process_duration}s") { { duration: process_duration } }
  resp
rescue StandardError => e
  logger.info("Processing failed after #{process_duration}s") { { duration: process_duration } }
  raise(e)
end

#initialize(message:) ⇒ Object

Build a new subscriber instance.



142
143
144
# File 'lib/cloudenvoy/subscriber.rb', line 142

def initialize(message:)
  @message = message
end

#loggerLogger, any

Return the Cloudenvoy logger instance.



151
152
153
# File 'lib/cloudenvoy/subscriber.rb', line 151

def logger
  @logger ||= SubscriberLogger.new(self)
end

#process_durationFloat

Return the time taken (in seconds) to process the message. This duration includes the middlewares and the actual process method.



161
162
163
164
165
# File 'lib/cloudenvoy/subscriber.rb', line 161

def process_duration
  return 0.0 unless process_ended_at && process_started_at

  (process_ended_at - process_started_at).ceil(3)
end