Class: HonestPubsub::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/honest_pubsub/subscriber.rb

Constant Summary collapse

@@registered_subscribers =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(delivery_routing_data, delivery_properties, context) ⇒ Subscriber

Returns a new instance of Subscriber.

Parameters:

  • context (Hash)

    Context from invocation

  • delivery_routing_data (Object)

    Contains routing information like originator and routing key

  • delivery_properties (Object)


47
48
49
50
51
# File 'lib/honest_pubsub/subscriber.rb', line 47

def initialize(delivery_routing_data, delivery_properties, context)
  @delivery_routing_data = delivery_routing_data
  @delivery_properties   = delivery_properties
  @context               = context
end

Instance Attribute Details

#contextObject

Returns the value of attribute context.



11
12
13
# File 'lib/honest_pubsub/subscriber.rb', line 11

def context
  @context
end

#delivery_propertiesObject

Returns the value of attribute delivery_properties.



11
12
13
# File 'lib/honest_pubsub/subscriber.rb', line 11

def delivery_properties
  @delivery_properties
end

#delivery_routing_dataObject

Returns the value of attribute delivery_routing_data.



11
12
13
# File 'lib/honest_pubsub/subscriber.rb', line 11

def delivery_routing_data
  @delivery_routing_data
end

Class Method Details

.handle_errors_with(handler) ⇒ Object

Sets an error handler for the class



40
41
42
# File 'lib/honest_pubsub/subscriber.rb', line 40

def self.handle_errors_with(handler)
  error_handler = handler
end

.inherited(klass) ⇒ Object



13
14
15
# File 'lib/honest_pubsub/subscriber.rb', line 13

def self.inherited(klass)
  @@registered_subscribers << klass
end

.subscribe_to(routing_key_name, options = {}) ⇒ Object

Specify the routing key that the subscriber class should listen to.

Parameters:

  • routing_key_name (String)

    The routing key to subscribe to. Must be characters only separated by periods (.)

  • options (Hash) (defaults to: {})

    Allowed option is :on to optionally specify a queue. If not provided, queue name is generated from the routing key



20
21
22
23
24
25
26
27
# File 'lib/honest_pubsub/subscriber.rb', line 20

def self.subscribe_to(routing_key_name, options = {})
  options.assert_valid_keys(:on)
  unless validate_routing_key_name(routing_key_name)
    raise ArgumentError.new("#{routing_key_name} is not supported. Only lower case characters separated by periods are allowed.")
  end
  self.subscribed_key   = routing_key_name
  self.subscribed_queue = generated_queue_name(routing_key_name, options[:on])
end

.validates_payload_with(*validators) ⇒ Object

Sets the validator for payload

Parameters:

  • validator

    The validator to use for validating the payload. Returns false if the payload is not valid. Proc must accept a payload as an argument.



34
35
36
37
# File 'lib/honest_pubsub/subscriber.rb', line 34

def self.validates_payload_with(*validators)
  self.payload_validators ||= []
  self.payload_validators += validators
end

Instance Method Details

#perform(payload) ⇒ Object

Actual subscribers need to implement perform method. This is the method where the message is actually processed.

Parameters:

  • payload (Object)

    Payload of the message



66
67
68
# File 'lib/honest_pubsub/subscriber.rb', line 66

def perform(payload)
  raise "Need implementation for your worker."
end

#perform!(payload) ⇒ Object

Performs validation if validates_payload_with is defined and then calls the perform method

Parameters:

  • payload (Object)

    Payload of the message



55
56
57
58
59
60
61
62
# File 'lib/honest_pubsub/subscriber.rb', line 55

def perform!(payload)
  if !valid_payload?(payload)
    HonestPubsub.logger.error("Payload validation failed for #{self.class.name}")
    raise ::HonestPubsub::PayloadValidationError.new("Invalid Payload for #{self.class.name}")
  end

  perform(context, payload)
end

#routing_keyString

Returns The original routing key with which the current message was published.

Returns:

  • (String)

    The original routing key with which the current message was published



71
72
73
# File 'lib/honest_pubsub/subscriber.rb', line 71

def routing_key
  delivery_routing_data[:routing_key]
end

#valid_payload?(payload) ⇒ Boolen

Iterates over all the payload validators and returns false if any of them are false

Parameters:

  • payload (Object)

    The payload/arguments of the message

Returns:

  • (Boolen)

    Should return true or false value - If no validators are specified, then returns true



78
79
80
81
82
83
84
# File 'lib/honest_pubsub/subscriber.rb', line 78

def valid_payload?(payload)
  return true unless payload_validators.present?

  payload_validators.inject(true) { |is_valid, validator|
    is_valid && (validator.respond_to?(:call) ? validator.call(payload) : send(validator, payload))
  }
end