Class: SimpleFeed::Providers::Redis::Provider

Inherits:
Base::Provider show all
Includes:
Driver
Defined in:
lib/simplefeed/providers/redis/provider.rb

Overview

Internal data structure:

```YAML
  u.afkj234.data:
    - [ 'John liked Robert', '2016-11-20 23:32:56 -0800' ]
    - [ 'Debbie liked Robert', '2016-11-20 23:35:56 -0800' ]
  u.afkj234.meta: { total: 2, unread: 2, last_read: 2016-11-20 22:00:34 -08:00 GMT }
```

Constant Summary collapse

FEED_METHODS =
%i(total_memory_bytes total_users last_disk_save_time).freeze

Instance Attribute Summary

Attributes included from Driver

#pool

Instance Method Summary collapse

Methods included from Driver

#debug?, #exec, #initialize, #on_error, #with_multi, #with_pipelined, #with_redis, #with_retries

Instance Method Details

#delete(user_ids:, value:) ⇒ Object



44
45
46
47
48
# File 'lib/simplefeed/providers/redis/provider.rb', line 44

def delete(user_ids:, value:, **)
  with_response_pipelined(user_ids) do |redis, key|
    redis.zrem(key.data, value)
  end
end

#delete_if(user_ids:) ⇒ Object

Raises:

  • (ArgumentError)


50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/simplefeed/providers/redis/provider.rb', line 50

def delete_if(user_ids:)
  raise ArgumentError, '#delete_if must be called with a block that receives (user_id, event) as arguments.' unless block_given?

  with_response_batched(user_ids) do |key|
    fetch(user_ids: [key.consumer])[key.consumer].map do |event|
      with_redis do |redis|
        if yield(event, key.consumer)
          redis.zrem(key.data, event.value) ? event : nil
        end
      end
    end.compact
  end
end

#fetch(user_ids:, since: nil, reset_last_read: false) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/simplefeed/providers/redis/provider.rb', line 84

def fetch(user_ids:, since: nil, reset_last_read: false)
  if since == :unread
    last_read_response = with_response_pipelined(user_ids) do |redis, key|
      get_users_last_read(redis, key)
    end
  end

  response = with_response_pipelined(user_ids) do |redis, key|
    if since == :unread
      redis.zrevrangebyscore(key.data, '+inf', (last_read_response.delete(key.consumer) || 0).to_f, withscores: true)
    elsif since
      redis.zrevrangebyscore(key.data, '+inf', since.to_f, withscores: true)
    else
      redis.zrevrange(key.data, 0, -1, withscores: true)
    end
  end

  reset_last_read_value(user_ids: user_ids, at: reset_last_read) if reset_last_read

  response
end

#last_read(user_ids:) ⇒ Object



128
129
130
131
132
# File 'lib/simplefeed/providers/redis/provider.rb', line 128

def last_read(user_ids:)
  with_response_pipelined(user_ids) do |redis, key, *|
    get_users_last_read(redis, key)
  end
end

#paginate(user_ids:, page:, per_page: feed.per_page, with_total: false, reset_last_read: false) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/simplefeed/providers/redis/provider.rb', line 70

def paginate(user_ids:, page:,
             per_page: feed.per_page,
             with_total: false,
             reset_last_read: false)

  reset_last_read_value(user_ids: user_ids, at: reset_last_read) if reset_last_read

  with_response_pipelined(user_ids) do |redis, key|
    events = paginated_events(page, per_page, redis, key)
    with_total ? { events:      events,
                   total_count: redis.zcard(key.data) } : events
  end
end

#reset_last_read(user_ids:, at: Time.now) ⇒ Object



106
107
108
109
110
# File 'lib/simplefeed/providers/redis/provider.rb', line 106

def reset_last_read(user_ids:, at: Time.now)
  with_response_pipelined(user_ids) do |redis, key, *|
    reset_users_last_read(redis, key, at.to_f)
  end
end

#store(user_ids:, value:, at: Time.now) ⇒ Object



36
37
38
39
40
41
42
# File 'lib/simplefeed/providers/redis/provider.rb', line 36

def store(user_ids:, value:, at: Time.now)
  with_response_pipelined(user_ids) do |redis, key|
    tap redis.zadd(key.data, at.to_f, value) do
      redis.zremrangebyrank(key.data, 0, -feed.max_size - 1)
    end
  end
end

#total_count(user_ids:) ⇒ Object



112
113
114
115
116
# File 'lib/simplefeed/providers/redis/provider.rb', line 112

def total_count(user_ids:)
  with_response_pipelined(user_ids) do |redis, key|
    redis.zcard(key.data)
  end
end

#total_memory_bytesObject



136
137
138
# File 'lib/simplefeed/providers/redis/provider.rb', line 136

def total_memory_bytes
  with_stats(:used_memory_since_boot)
end

#total_usersObject



140
141
142
# File 'lib/simplefeed/providers/redis/provider.rb', line 140

def total_users
  with_redis { |redis| redis.dbsize / 2 }
end

#transform_response(user_id = nil, result) ⇒ Object



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/simplefeed/providers/redis/provider.rb', line 150

def transform_response(user_id = nil, result)
  case result
  when ::Redis::Future
    transform_response(user_id, result.value)

  when ::Hash

    if result.values.any? { |v| transformable_type?(v) }
      result.each { |k, v| result[k] = transform_response(user_id, v) }
    else
      result
    end

  when ::Array

    if result.any? { |v| transformable_type?(v) }
      result = result.map { |v| transform_response(user_id, v) }
    end

    if result.size == 2 && result[1].is_a?(Float)
      SimpleFeed::Event.new(value: result[0], at: Time.at(result[1]))
    else
      result
    end

  when ::String
    if result =~ /^\d+\.\d+$/
      result.to_f
    elsif result =~ /^\d+$/
      result.to_i
    else
      result
    end
  else
    result
  end
end

#transformable_type?(value) ⇒ Boolean

Returns:

  • (Boolean)


188
189
190
191
192
193
194
195
# File 'lib/simplefeed/providers/redis/provider.rb', line 188

def transformable_type?(value)
  [
    ::Redis::Future,
    ::Hash,
    ::Array,
    ::String
  ].include?(value.class)
end

#unread_count(user_ids:) ⇒ Object



118
119
120
121
122
123
124
125
126
# File 'lib/simplefeed/providers/redis/provider.rb', line 118

def unread_count(user_ids:)
  response = with_response_pipelined(user_ids) do |redis, key|
    get_users_last_read(redis, key)
  end
  with_response_pipelined(response.user_ids, response) do |redis, key, _response|
    last_read = _response.delete(key.consumer).to_f
    redis.zcount(key.data, last_read, '+inf')
  end
end

#wipe(user_ids:) ⇒ Object



64
65
66
67
68
# File 'lib/simplefeed/providers/redis/provider.rb', line 64

def wipe(user_ids:)
  with_response_pipelined(user_ids) do |redis, key|
    key.keys.all? { |redis_key| redis.del(redis_key) }
  end
end

#with_stats(operation) ⇒ Object



144
145
146
147
148
# File 'lib/simplefeed/providers/redis/provider.rb', line 144

def with_stats(operation)
  with_redis do |redis|
    SimpleFeed::Providers::Redis::Stats.new(redis).send(operation)
  end
end