Class: QueueBus::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/queue_bus/subscription.rb

Overview

A Subscription is the destination of an event.

The subscription can be stored in redis but should only be executed on ruby processes that have the application loaded. In general, this is controlled by having the background workers listen to specific (and discrete) queues.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, key, class_name, filters, executor = nil) ⇒ Subscription

Returns a new instance of Subscription.



33
34
35
36
37
38
39
# File 'lib/queue_bus/subscription.rb', line 33

def initialize(queue_name, key, class_name, filters, executor = nil)
  @queue_name = self.class.normalize(queue_name)
  @key        = key.to_s
  @class_name = class_name.to_s
  @matcher    = Matcher.new(filters)
  @executor   = executor
end

Instance Attribute Details

#app_keyObject

dyanmically set on return from subscription_matches



31
32
33
# File 'lib/queue_bus/subscription.rb', line 31

def app_key
  @app_key
end

#class_nameObject (readonly)

Returns the value of attribute class_name.



30
31
32
# File 'lib/queue_bus/subscription.rb', line 30

def class_name
  @class_name
end

#executorObject (readonly)

Returns the value of attribute executor.



30
31
32
# File 'lib/queue_bus/subscription.rb', line 30

def executor
  @executor
end

#keyObject (readonly)

Returns the value of attribute key.



30
31
32
# File 'lib/queue_bus/subscription.rb', line 30

def key
  @key
end

#matcherObject (readonly)

Returns the value of attribute matcher.



30
31
32
# File 'lib/queue_bus/subscription.rb', line 30

def matcher
  @matcher
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



30
31
32
# File 'lib/queue_bus/subscription.rb', line 30

def queue_name
  @queue_name
end

Class Method Details

.from_redis(hash) ⇒ Object



15
16
17
18
19
20
21
22
23
# File 'lib/queue_bus/subscription.rb', line 15

def from_redis(hash)
  queue_name = hash['queue_name'].to_s
  key        = hash['key'].to_s
  class_name = hash['class'].to_s
  matcher    = hash['matcher']
  return nil if key.empty? || queue_name.empty?

  Subscription.new(queue_name, key, class_name, matcher, nil)
end

.normalize(val) ⇒ Object



25
26
27
# File 'lib/queue_bus/subscription.rb', line 25

def normalize(val)
  val.to_s.gsub(/\W/, '_').downcase
end

.register(queue, key, class_name, matcher, block) ⇒ Object



11
12
13
# File 'lib/queue_bus/subscription.rb', line 11

def register(queue, key, class_name, matcher, block)
  Subscription.new(queue, key, class_name, matcher, block)
end

Instance Method Details

#execute!(attributes) ⇒ Object

Executes the subscription. If this is run on a server/ruby process that did not subscribe it will error as there will not be a proc.



43
44
45
46
47
48
49
50
# File 'lib/queue_bus/subscription.rb', line 43

def execute!(attributes)
  if attributes.respond_to?(:with_indifferent_access)
    attributes = attributes.with_indifferent_access
  end
  ::QueueBus.with_global_attributes(attributes) do
    executor.call(attributes)
  end
end

#matches?(attributes) ⇒ Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/queue_bus/subscription.rb', line 52

def matches?(attributes)
  @matcher.matches?(attributes)
end

#to_redisObject



56
57
58
59
60
61
62
63
# File 'lib/queue_bus/subscription.rb', line 56

def to_redis
  out = {}
  out['queue_name'] = queue_name
  out['key']        = key
  out['class']      = class_name
  out['matcher']    = matcher.to_redis
  out
end