Class: SimpleFeed::Providers::Redis::Provider
Overview
Internal data structure:
```YAML
u.afkj234.data:
- [ 'John liked Robert', '2016-11-20 23:32:56 -0800' ]
- [ 'Debbie liked Robert', '2016-11-20 23:35:56 -0800' ]
u.afkj234.meta: { total: 2, unread: 2, last_read: 2016-11-20 22:00:34 -08:00 GMT }
```
Constant Summary
collapse
- FEED_METHODS =
i(total_memory_bytes total_users last_disk_save_time).freeze
Instance Attribute Summary
Attributes included from Driver
#pool
Instance Method Summary
collapse
-
#delete(user_ids:, value:) ⇒ Object
-
#delete_if(user_ids:) ⇒ Object
-
#fetch(user_ids:, since: nil, reset_last_read: false) ⇒ Object
-
#last_read(user_ids:) ⇒ Object
-
#paginate(user_ids:, page:, per_page: feed.per_page, with_total: false, reset_last_read: false) ⇒ Object
-
#reset_last_read(user_ids:, at: Time.now) ⇒ Object
-
#store(user_ids:, value:, at: Time.now) ⇒ Object
-
#total_count(user_ids:) ⇒ Object
-
#total_memory_bytes ⇒ Object
-
#total_users ⇒ Object
-
#transform_response(user_id = nil, result) ⇒ Object
-
#transformable_type?(value) ⇒ Boolean
-
#unread_count(user_ids:) ⇒ Object
-
#wipe(user_ids:) ⇒ Object
-
#with_stats(operation) ⇒ Object
Methods included from Driver
#debug?, #exec, #initialize, #on_error, #with_multi, #with_pipelined, #with_redis, #with_retries
Instance Method Details
#delete(user_ids:, value:) ⇒ Object
44
45
46
47
48
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 44
def delete(user_ids:, value:, **)
with_response_pipelined(user_ids) do |redis, key|
redis.zrem(key.data, value)
end
end
|
#delete_if(user_ids:) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 50
def delete_if(user_ids:)
raise ArgumentError, '#delete_if must be called with a block that receives (user_id, event) as arguments.' unless block_given?
with_response_batched(user_ids) do |key|
fetch(user_ids: [key.consumer])[key.consumer].map do |event|
with_redis do |redis|
if yield(event, key.consumer)
redis.zrem(key.data, event.value) ? event : nil
end
end
end.compact
end
end
|
#fetch(user_ids:, since: nil, reset_last_read: false) ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 84
def fetch(user_ids:, since: nil, reset_last_read: false)
if since == :unread
last_read_response = with_response_pipelined(user_ids) do |redis, key|
get_users_last_read(redis, key)
end
end
response = with_response_pipelined(user_ids) do |redis, key|
if since == :unread
redis.zrevrangebyscore(key.data, '+inf', (last_read_response.delete(key.consumer) || 0).to_f, withscores: true)
elsif since
redis.zrevrangebyscore(key.data, '+inf', since.to_f, withscores: true)
else
redis.zrevrange(key.data, 0, -1, withscores: true)
end
end
reset_last_read_value(user_ids: user_ids, at: reset_last_read) if reset_last_read
response
end
|
#last_read(user_ids:) ⇒ Object
128
129
130
131
132
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 128
def last_read(user_ids:)
with_response_pipelined(user_ids) do |redis, key, *|
get_users_last_read(redis, key)
end
end
|
#paginate(user_ids:, page:, per_page: feed.per_page, with_total: false, reset_last_read: false) ⇒ Object
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 70
def paginate(user_ids:, page:,
per_page: feed.per_page,
with_total: false,
reset_last_read: false)
reset_last_read_value(user_ids: user_ids, at: reset_last_read) if reset_last_read
with_response_pipelined(user_ids) do |redis, key|
events = paginated_events(page, per_page, redis, key)
with_total ? { events: events,
total_count: redis.zcard(key.data) } : events
end
end
|
#reset_last_read(user_ids:, at: Time.now) ⇒ Object
106
107
108
109
110
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 106
def reset_last_read(user_ids:, at: Time.now)
with_response_pipelined(user_ids) do |redis, key, *|
reset_users_last_read(redis, key, at.to_f)
end
end
|
#store(user_ids:, value:, at: Time.now) ⇒ Object
36
37
38
39
40
41
42
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 36
def store(user_ids:, value:, at: Time.now)
with_response_pipelined(user_ids) do |redis, key|
tap redis.zadd(key.data, at.to_f, value) do
redis.zremrangebyrank(key.data, 0, -feed.max_size - 1)
end
end
end
|
#total_count(user_ids:) ⇒ Object
112
113
114
115
116
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 112
def total_count(user_ids:)
with_response_pipelined(user_ids) do |redis, key|
redis.zcard(key.data)
end
end
|
#total_memory_bytes ⇒ Object
136
137
138
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 136
def total_memory_bytes
with_stats(:used_memory_since_boot)
end
|
#total_users ⇒ Object
140
141
142
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 140
def total_users
with_redis { |redis| redis.dbsize / 2 }
end
|
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 150
def transform_response(user_id = nil, result)
case result
when ::Redis::Future
transform_response(user_id, result.value)
when ::Hash
if result.values.any? { |v| transformable_type?(v) }
result.each { |k, v| result[k] = transform_response(user_id, v) }
else
result
end
when ::Array
if result.any? { |v| transformable_type?(v) }
result = result.map { |v| transform_response(user_id, v) }
end
if result.size == 2 && result[1].is_a?(Float)
SimpleFeed::Event.new(value: result[0], at: Time.at(result[1]))
else
result
end
when ::String
if result =~ /^\d+\.\d+$/
result.to_f
elsif result =~ /^\d+$/
result.to_i
else
result
end
else
result
end
end
|
188
189
190
191
192
193
194
195
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 188
def transformable_type?(value)
[
::Redis::Future,
::Hash,
::Array,
::String
].include?(value.class)
end
|
#unread_count(user_ids:) ⇒ Object
118
119
120
121
122
123
124
125
126
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 118
def unread_count(user_ids:)
response = with_response_pipelined(user_ids) do |redis, key|
get_users_last_read(redis, key)
end
with_response_pipelined(response.user_ids, response) do |redis, key, _response|
last_read = _response.delete(key.consumer).to_f
redis.zcount(key.data, last_read, '+inf')
end
end
|
#wipe(user_ids:) ⇒ Object
64
65
66
67
68
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 64
def wipe(user_ids:)
with_response_pipelined(user_ids) do |redis, key|
key.keys.all? { |redis_key| redis.del(redis_key) }
end
end
|
#with_stats(operation) ⇒ Object
144
145
146
147
148
|
# File 'lib/simplefeed/providers/redis/provider.rb', line 144
def with_stats(operation)
with_redis do |redis|
SimpleFeed::Providers::Redis::Stats.new(redis).send(operation)
end
end
|