Class: Pakyow::Data::Subscribers Private

Inherits:
Object
  • Object
show all
Extended by:
Support::DeepFreeze
Defined in:
lib/pakyow/data/subscribers.rb,
lib/pakyow/data/subscribers/adapters/redis.rb,
lib/pakyow/data/subscribers/adapters/memory.rb,
lib/pakyow/data/subscribers/adapters/redis/pipeliner.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Defined Under Namespace

Modules: Adapters

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app, adapter = :memory, adapter_config = {}) ⇒ Subscribers

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Subscribers.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/pakyow/data/subscribers.rb', line 19

def initialize(app, adapter = :memory, adapter_config = {})
  @app = app

  require "pakyow/data/subscribers/adapters/#{adapter}"
  @adapter = Pakyow::Data::Subscribers::Adapters.const_get(
    adapter.to_s.capitalize
  ).new(
    adapter_config.to_h.merge(
      app.config.data.subscriptions.adapter_settings.to_h
    )
  )

  @executor = Concurrent::ThreadPoolExecutor.new(
    auto_terminate: false,
    min_threads: 1,
    max_threads: 10,
    max_queue: 0
  )
rescue LoadError, NameError => error
  raise UnknownSubscriberAdapter.build(error, adapter: adapter)
end

Instance Attribute Details

#adapterObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



12
13
14
# File 'lib/pakyow/data/subscribers.rb', line 12

def adapter
  @adapter
end

#lookupObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



12
13
14
# File 'lib/pakyow/data/subscribers.rb', line 12

def lookup
  @lookup
end

Instance Method Details

#did_mutate(source_name, changed_values = nil, result_source = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/pakyow/data/subscribers.rb', line 56

def did_mutate(source_name, changed_values = nil, result_source = nil)
  @executor.post(source_name, changed_values, result_source, Pakyow.logger.target) do |source_name, changed_values, result_source, logger|
    logger.internal {
      "[Pakyow::Data::Subscribers] did mutate #{source_name}"
    }

    subscriptions = @adapter.subscriptions_for_source(source_name)

    logger.internal {
      "[Pakyow::Data::Subscribers] fetched #{subscriptions.count} subscriptions"
    }

    subscriptions.uniq { |subscription|
      subscription.dig(:payload, :id) || subscription
    }.select { |subscription|
      process?(subscription, changed_values, result_source)
    }.each do |subscription|
      if subscription[:version] == @app.config.data.subscriptions.version
        begin
          logger.internal {
            "[Pakyow::Data::Subscribers] processing subscription #{subscription[:id]}"
          }

          process(subscription, result_source)

          logger.internal {
            "[Pakyow::Data::Subscribers] finished processing subscription #{subscription[:id]}"
          }
        rescue => error
          logger.error {
            "[Pakyow::Data::Subscribers] did_mutate failed: #{error}"
          }
        end
      end
    end

    logger.internal {
      "[Pakyow::Data::Subscribers] finished mutate for #{source_name}"
    }
  end
end

#expire(subscriber, seconds) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



102
103
104
# File 'lib/pakyow/data/subscribers.rb', line 102

def expire(subscriber, seconds)
  @adapter.expire(subscriber, seconds)
end

#persist(subscriber) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



106
107
108
# File 'lib/pakyow/data/subscribers.rb', line 106

def persist(subscriber)
  @adapter.persist(subscriber)
end

#register_subscriptions(subscriptions, subscriber: nil, &block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



46
47
48
49
50
51
52
53
54
# File 'lib/pakyow/data/subscribers.rb', line 46

def register_subscriptions(subscriptions, subscriber: nil, &block)
  subscriptions.each do |subscription|
    subscription[:version] = @app.config.data.subscriptions.version
  end

  @adapter.register_subscriptions(subscriptions, subscriber: subscriber).tap do |ids|
    yield ids if block_given?
  end
end

#shutdownObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



41
42
43
44
# File 'lib/pakyow/data/subscribers.rb', line 41

def shutdown
  @executor.shutdown
  @executor.wait_for_termination(30)
end

#unsubscribe(subscriber) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



98
99
100
# File 'lib/pakyow/data/subscribers.rb', line 98

def unsubscribe(subscriber)
  @adapter.unsubscribe(subscriber)
end