Class: Banter::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/banter/subscriber.rb

Constant Summary collapse

@@registered_subscribers =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(delivery_routing_data, delivery_properties, context) ⇒ Subscriber

Returns a new instance of Subscriber.

Parameters:

  • context (Hash)

    Context from invocation

  • delivery_routing_data (Object)

    Contains routing information like originator and routing key

  • delivery_properties (Object)


55
56
57
58
59
# File 'lib/banter/subscriber.rb', line 55

def initialize(delivery_routing_data, delivery_properties, context)
  @delivery_routing_data = delivery_routing_data
  @delivery_properties   = delivery_properties
  @context               = context
end

Instance Attribute Details

#contextObject

Returns the value of attribute context.



12
13
14
# File 'lib/banter/subscriber.rb', line 12

def context
  @context
end

#delivery_propertiesObject

Returns the value of attribute delivery_properties.



12
13
14
# File 'lib/banter/subscriber.rb', line 12

def delivery_properties
  @delivery_properties
end

#delivery_routing_dataObject

Returns the value of attribute delivery_routing_data.



12
13
14
# File 'lib/banter/subscriber.rb', line 12

def delivery_routing_data
  @delivery_routing_data
end

Class Method Details

.handle_errors_with(handler) ⇒ Object

Sets an error handler for the class



48
49
50
# File 'lib/banter/subscriber.rb', line 48

def self.handle_errors_with(handler)
  error_handler = handler
end

.inherited(klass) ⇒ Object



14
15
16
# File 'lib/banter/subscriber.rb', line 14

def self.inherited(klass)
  @@registered_subscribers << klass
end

.subscribe_to(routing_key_name, options = {}) ⇒ Object

Specify the routing key that the subscriber class should listen to.

Parameters:

  • routing_key_name (String)

    The routing key to subscribe to. Must be characters only separated by periods (.)

  • options (Hash) (defaults to: {})

    subscription options @option [String] :durable sets the subscriber to be a durable subscriber (one that survives reboots). This currently defaults to true. @option [Integer] :queue_ttl Time, in seconds, that the message lives on the queue before being either consumer by a subscriber or being discarded.

    If not specified, then Banter::Configuration.default_queue_ttl is used
    


25
26
27
28
29
30
31
32
33
34
35
# File 'lib/banter/subscriber.rb', line 25

def self.subscribe_to(routing_key_name, options = {})
  options.assert_valid_keys(:durable, :queue_ttl)
  unless validate_routing_key_name(routing_key_name)
    raise ArgumentError.new("#{routing_key_name} is not supported. Only lower case characters separated by periods are allowed.")
  end
  self.subscribed_key   = routing_key_name
  self.application_name = generated_application_name(options[:on])
  self.subscribed_queue = generated_queue_name(routing_key_name, self.application_name)
  self.queue_ttl        = options[:queue_ttl] || Banter::Configuration.default_queue_ttl
  self.durable         = options.key?(:durable) ? options[:durable] : true
end

.validates_payload_with(*validators) ⇒ Object

Sets the validator for payload

Parameters:

  • validator

    The validator to use for validating the payload. Returns false if the payload is not valid. Proc must accept a payload as an argument.



42
43
44
45
# File 'lib/banter/subscriber.rb', line 42

def self.validates_payload_with(*validators)
  self.payload_validators ||= []
  self.payload_validators += validators
end

Instance Method Details

#perform(payload) ⇒ Object

Actual subscribers need to implement perform method. This is the method where the message is actually processed.

Parameters:

  • payload (Object)

    Payload of the message



73
74
75
# File 'lib/banter/subscriber.rb', line 73

def perform(payload)
  raise "Need implementation for your worker."
end

#perform!(payload) ⇒ Object

Performs validation if validates_payload_with is defined and then calls the perform method

Parameters:

  • payload (Object)

    Payload of the message



63
64
65
66
67
68
69
# File 'lib/banter/subscriber.rb', line 63

def perform!(payload)
  if !valid_payload?(payload)
    raise ::Banter::PayloadValidationError.new("Invalid Payload for #{self.class.name}")
  end

  perform(payload)
end

#routing_keyString

Returns The original routing key with which the current message was published.

Returns:

  • (String)

    The original routing key with which the current message was published



78
79
80
# File 'lib/banter/subscriber.rb', line 78

def routing_key
  delivery_routing_data[:routing_key]
end

#valid_payload?(payload) ⇒ Boolen

Iterates over all the payload validators and returns false if any of them are false

Parameters:

  • payload (Object)

    The payload/arguments of the message

Returns:

  • (Boolen)

    Should return true or false value - If no validators are specified, then returns true



85
86
87
88
89
90
91
# File 'lib/banter/subscriber.rb', line 85

def valid_payload?(payload)
  return true unless payload_validators.present?

  payload_validators.inject(true) { |is_valid, validator|
    is_valid && (validator.respond_to?(:call) ? validator.call(payload) : send(validator, payload))
  }
end