Class: Pakyow::Data::Subscribers::Adapters::Redis Private

Inherits:
Object
  • Object
show all
Extended by:
Support::DeepFreeze
Defined in:
lib/pakyow/data/subscribers/adapters/redis.rb,
lib/pakyow/data/subscribers/adapters/redis/pipeliner.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Manages data subscriptions in redis.

Use this in production.

Defined Under Namespace

Classes: Pipeliner

Constant Summary collapse

SCRIPTS =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

%i(register expire persist).freeze
KEY_PART_SEPARATOR =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

"/"
KEY_PREFIX =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

"data"
INFINITY =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

"+inf"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Redis

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Redis.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 40

def initialize(config)
  @redis = ConnectionPool.new(**config[:pool]) {
    ::Redis.new(config[:connection])
  }

  @prefix = [config[:key_prefix], KEY_PREFIX].join(KEY_PART_SEPARATOR)

  @scripts = {}
  load_scripts

  Concurrent::TimerTask.new(execution_interval: 300, timeout_interval: 300) {
    cleanup
  }.execute
end

Class Method Details

.generate_subscription_id(subscription_string) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



27
28
29
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 27

def generate_subscription_id(subscription_string)
  Digest::SHA1.hexdigest(subscription_string)
end

.stringify_subscription(subscription) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



23
24
25
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 23

def stringify_subscription(subscription)
  Zlib::Deflate.deflate(Marshal.dump(subscription))
end

Instance Method Details

#cleanupObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

FIXME: Refactor this into a lua script. We’ll want to stop using SCAN and instead store known sources in a set. Cleanup should then be based off the set of known sources and return the number of values that were removed.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 138

def cleanup
  @redis.with do |redis|
    redis.scan_each(match: key_subscription_ids_by_source("*")) do |key|
      Pakyow.logger.internal {
        "[Pakyow::Data::Subscribers::Adapters::Redis] Cleaning up expired subscriptions for #{key}"
      }

      removed_count = redis.zremrangebyscore(key, 0, Time.now.to_i)

      Pakyow.logger.internal {
        "[Pakyow::Data::Subscribers::Adapters::Redis] Removed #{removed_count} members for #{key}"
      }
    end
  end
end

#expire(subscriber, seconds) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



87
88
89
90
91
92
93
94
95
96
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 87

def expire(subscriber, seconds)
  @redis.with do |redis|
    redis.evalsha(@scripts[:expire], argv: [
      @prefix,
      KEY_PART_SEPARATOR,
      subscriber.to_s,
      Time.now.to_i + seconds
    ])
  end
end

#expiring?(subscriber) ⇒ Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)


108
109
110
111
112
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 108

def expiring?(subscriber)
  @redis.with do |redis|
    redis.ttl(key_subscription_ids_by_subscriber(subscriber)) > -1
  end
end

#persist(subscriber) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



98
99
100
101
102
103
104
105
106
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 98

def persist(subscriber)
  @redis.with do |redis|
    redis.evalsha(@scripts[:persist], argv: [
      @prefix,
      KEY_PART_SEPARATOR,
      subscriber.to_s
    ])
  end
end

#register_subscriptions(subscriptions, subscriber:) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 55

def register_subscriptions(subscriptions, subscriber:)
  [].tap do |subscription_ids|
    subscriptions.each do |subscription|
      subscription_string = self.class.stringify_subscription(subscription)
      subscription_id = self.class.generate_subscription_id(subscription_string)
      source = subscription[:source]

      @redis.with do |redis|
        redis.evalsha(@scripts[:register], argv: [
          @prefix,
          KEY_PART_SEPARATOR,
          subscriber.to_s,
          subscription_id,
          subscription_string,
          source.to_s,
          Time.now.to_i
        ])
      end

      subscription_ids << subscription_id
    end
  end
end

#subscribers_for_subscription_id(subscription_id) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



114
115
116
117
118
119
120
121
122
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 114

def subscribers_for_subscription_id(subscription_id)
  @redis.with do |redis|
    redis.zrangebyscore(
      key_subscribers_by_subscription_id(
        subscription_id
      ), INFINITY, INFINITY
    ).map(&:to_sym)
  end
end

#subscription_ids_for_source(source) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



124
125
126
127
128
129
130
131
132
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 124

def subscription_ids_for_source(source)
  @redis.with do |redis|
    redis.zrangebyscore(
      key_subscription_ids_by_source(
        source
      ), INFINITY, INFINITY
    )
  end
end

#subscriptions_for_source(source) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



79
80
81
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 79

def subscriptions_for_source(source)
  subscriptions_for_subscription_ids(subscription_ids_for_source(source)).compact
end

#unsubscribe(subscriber) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



83
84
85
# File 'lib/pakyow/data/subscribers/adapters/redis.rb', line 83

def unsubscribe(subscriber)
  expire(subscriber, 0)
end