Class: Banter::Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/banter/subscriber.rb

Constant Summary collapse

@@registered_subscribers =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(delivery_routing_data, delivery_properties, context, envelope) ⇒ Subscriber

Returns a new instance of Subscriber.

Parameters:

  • delivery_routing_data (Object)

    Contains routing information like originator and routing key

  • delivery_properties (Object)
  • context (Hash)
  • envelope (Hash)

92
93
94
95
96
97
# File 'lib/banter/subscriber.rb', line 92

def initialize(delivery_routing_data, delivery_properties, context, envelope)
  @delivery_routing_data = delivery_routing_data
  @delivery_properties   = delivery_properties
  @context               = context
  @envelope              = envelope
end

Instance Attribute Details

#contextObject

Returns the value of attribute context


12
13
14
# File 'lib/banter/subscriber.rb', line 12

def context
  @context
end

#delivery_propertiesObject

Returns the value of attribute delivery_properties


12
13
14
# File 'lib/banter/subscriber.rb', line 12

def delivery_properties
  @delivery_properties
end

#delivery_routing_dataObject

Returns the value of attribute delivery_routing_data


12
13
14
# File 'lib/banter/subscriber.rb', line 12

def delivery_routing_data
  @delivery_routing_data
end

#envelopeObject

Returns the value of attribute envelope


12
13
14
# File 'lib/banter/subscriber.rb', line 12

def envelope
  @envelope
end

Class Method Details

.get_config_value(options, options_key, config_key = nil) ⇒ Object


58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/banter/subscriber.rb', line 58

def self.get_config_value(options, options_key, config_key = nil)
  if options.key?(options_key)
    options[options_key]
  else
    config_key = options_key if config_key.nil?
    configure_obj = [Banter::Configuration.configuration, Banter::Configuration].find do |obj|
      obj.respond_to?(config_key)
    end
    configure_obj.nil? ? nil : configure_obj.send(config_key)
  end
rescue => e
  puts "Banter Configuration does not exist #{config_key}:#{e.message}"
  raise e
end

.handle_errors_with(handler) ⇒ Object

Sets an error handler for the class


84
85
86
# File 'lib/banter/subscriber.rb', line 84

def self.handle_errors_with(handler)
  error_handler = handler
end

.inherited(klass) ⇒ Object


14
15
16
# File 'lib/banter/subscriber.rb', line 14

def self.inherited(klass)
  @@registered_subscribers << klass
end

.subscribe_to(routing_key_name, options = {}) ⇒ Object

Specify the routing key that the subscriber class should listen to.

Parameters:

  • routing_key_name (String)

    The routing key to subscribe to. Must be characters only separated by periods (.)

  • options (Hash) (defaults to: {})

    subscription options @option [String] :durable sets the subscriber to be a durable subscriber (one that survives reboots). This currently defaults to true. @option [Integer] :ttl Time, in seconds, that the message lives on the queue before being either consumer by a subscriber or being discarded.

    If not specified, then Banter::Configuration.default_queue_ttl is used
    

    @option [String] :exchange_name sets to exchange to the specified value, or if it is not used, will default to Banter.Configuration.exchange_name @option [String] :dead_letter_exchange_name name of queue to subscribe to. @option [String] :queue_name name of queue to subscribe to.


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/banter/subscriber.rb', line 27

def self.subscribe_to(routing_key_name, options = {})

  unless validate_routing_key_name(routing_key_name)
    raise ArgumentError.new("#{routing_key_name} is not supported. Only lower case characters separated by periods are allowed.")
  end

  options.assert_valid_keys(
    :topic_prefix,
    :exchange_name,
    :dead_letter_exchange_name,
    :dead_letter_queue_name,
    :queue_name,
    :ttl,
    :durable,
    :pool_size)

  config = Hashie::Mash.new
  config[:topic]          = get_config_value(options, :topic_prefix)
  config[:exchange]       = get_config_value(options, :exchange_name)
  config[:dead_letter_exchange] = get_config_value(options, :dead_letter_exchange_name)
  config[:dead_letter_queue_name] = get_config_value(options, :dead_letter_queue_name)
  config[:routing_key]    = self.subscribed_key = routing_key_name
  config[:queue_name]     = self.subscribed_queue = options[:queue_name] || generated_queue_name(routing_key_name, Banter::Configuration.application_name)
  config[:ttl]            = get_config_value(options, :ttl, :default_queue_ttl)
  config[:durable]        = get_config_value(options, :durable, :durable_queues)
  config[:pool_size]      = get_config_value(options, :pool_size)

  self.config = config

end

.validates_payload_with(*validators) ⇒ Object

Sets the validator for payload

Parameters:

  • validator

    The validator to use for validating the payload. Returns false if the payload is not valid. Proc must accept a payload as an argument.


78
79
80
81
# File 'lib/banter/subscriber.rb', line 78

def self.validates_payload_with(*validators)
  self.payload_validators ||= []
  self.payload_validators += validators
end

Instance Method Details

#perform(payload) ⇒ Object

Actual subscribers need to implement perform method. This is the method where the message is actually processed.

Parameters:

  • payload (Object)

    Payload of the message


111
112
113
# File 'lib/banter/subscriber.rb', line 111

def perform(payload)
  raise "Need implementation for your worker."
end

#perform!(payload) ⇒ Object

Performs validation if validates_payload_with is defined and then calls the perform method

Parameters:

  • payload (Object)

    Payload of the message


101
102
103
104
105
106
107
# File 'lib/banter/subscriber.rb', line 101

def perform!(payload)
  if !valid_payload?(payload)
    raise ::Banter::PayloadValidationError.new("Invalid Payload for #{self.class.name}")
  end

  perform(payload)
end

#routing_keyString

Returns The original routing key with which the current message was published.

Returns:

  • (String)

    The original routing key with which the current message was published


116
117
118
# File 'lib/banter/subscriber.rb', line 116

def routing_key
  delivery_routing_data[:routing_key]
end

#valid_payload?(payload) ⇒ Boolen

Iterates over all the payload validators and returns false if any of them are false

Parameters:

  • payload (Object)

    The payload/arguments of the message

Returns:

  • (Boolen)

    Should return true or false value - If no validators are specified, then returns true


123
124
125
126
127
128
129
# File 'lib/banter/subscriber.rb', line 123

def valid_payload?(payload)
  return true unless payload_validators.present?

  payload_validators.inject(true) { |is_valid, validator|
    is_valid && (validator.respond_to?(:call) ? validator.call(payload) : send(validator, payload))
  }
end