Class: Downstream::Stateless::Subscriber

Inherits:
Object
  • Object
show all
Includes:
AfterCommitEverywhere
Defined in:
lib/downstream/pubsub_adapters/stateless/subscriber.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(callable, async: false) ⇒ Subscriber

Returns a new instance of Subscriber.



10
11
12
13
# File 'lib/downstream/pubsub_adapters/stateless/subscriber.rb', line 10

def initialize(callable, async: false)
  @callable = callable
  @async = async
end

Instance Attribute Details

#asyncObject (readonly)

Returns the value of attribute async.



8
9
10
# File 'lib/downstream/pubsub_adapters/stateless/subscriber.rb', line 8

def async
  @async
end

#callableObject (readonly)

Returns the value of attribute callable.



8
9
10
# File 'lib/downstream/pubsub_adapters/stateless/subscriber.rb', line 8

def callable
  @callable
end

Instance Method Details

#async?Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/downstream/pubsub_adapters/stateless/subscriber.rb', line 15

def async?
  !!async
end

#call(_name, _start, _finish, _id, event) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/downstream/pubsub_adapters/stateless/subscriber.rb', line 19

def call(_name, _start, _finish, _id, event)
  if async?
    if callable.is_a?(Proc) || callable.name.nil?
      raise ArgumentError, "Anonymous subscribers (blocks/procs/lambdas or anonymous modules) cannot be asynchronous"
    end

    raise ArgumentError, "Async subscriber must be a module/class, not instance" unless callable.is_a?(Module)

    after_commit do
      SubscriberJob.then do |job|
        if (queue_name = async_queue_name)
          job.set(queue: queue_name)
        else
          job
        end
      end.perform_later(event, callable.name)
    end
  else
    callable.call(event)
  end
end

#subscribe(identifier) ⇒ Object



41
42
43
44
45
46
# File 'lib/downstream/pubsub_adapters/stateless/subscriber.rb', line 41

def subscribe(identifier)
  @notification_subscriber = ActiveSupport::Notifications.subscribe(
    identifier,
    self
  )
end

#unsubscribeObject



48
49
50
# File 'lib/downstream/pubsub_adapters/stateless/subscriber.rb', line 48

def unsubscribe
  ActiveSupport::Notifications.unsubscribe(notification_subscriber)
end