Class: Rdsck

Inherits:
Object
  • Object
show all
Defined in:
lib/nchan_tools/rdsck.rb

Defined Under Namespace

Classes: Watch

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opt) ⇒ Rdsck

Returns a new instance of Rdsck.



48
49
50
51
52
53
# File 'lib/nchan_tools/rdsck.rb', line 48

def initialize(opt)
  @url=opt[:url]
  @verbose=opt[:verbose]
  @namespace=opt[:namespace]
  @channel_id=opt[:channel_id]
end

Instance Attribute Details

#mastersObject

Returns the value of attribute masters.



39
40
41
# File 'lib/nchan_tools/rdsck.rb', line 39

def masters
  @masters
end

#namespaceObject

Returns the value of attribute namespace.



38
39
40
# File 'lib/nchan_tools/rdsck.rb', line 38

def namespace
  @namespace
end

#redisObject

Returns the value of attribute redis.



39
40
41
# File 'lib/nchan_tools/rdsck.rb', line 39

def redis
  @redis
end

#urlObject

Returns the value of attribute url.



38
39
40
# File 'lib/nchan_tools/rdsck.rb', line 38

def url
  @url
end

#verboseObject

Returns the value of attribute verbose.



38
39
40
# File 'lib/nchan_tools/rdsck.rb', line 38

def verbose
  @verbose
end

Instance Method Details

#cluster?Boolean

Returns:

  • (Boolean)


55
56
57
# File 'lib/nchan_tools/rdsck.rb', line 55

def cluster?
  @cluster_mode
end

#connectObject



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/nchan_tools/rdsck.rb', line 59

def connect
  begin
    @redis=Redis.new url: @url
    
    mode = redis.info["redis_mode"]
    
  rescue StandardError => e
    STDERR.puts e.message
    return false
  end

  if mode == "cluster"
    @redis.close
    begin
      @redis=Redis.new cluster: [@url]
      @redis.ping
    rescue StandardError => e
      STDERR.puts e.message
      return false
    end
    
    @cluster_mode = true
    @masters = []
    
    redis.connection.each do |c|
      node = Redis.new url: c[:id]
      @masters << node
    end
  else
    @masters = [@redis]
  end
  
  dbg "Connected to Redis #{mode == "cluster" ? "cluster" : "server"}"
  (Array === @redis.connection ? @redis.connection : [@redis.connection]) .each do |v|
    dbg "  #{v[:id]}"
  end
  self
end

#dbg(*args) ⇒ Object



41
42
43
44
45
46
# File 'lib/nchan_tools/rdsck.rb', line 41

def dbg(*args)
  if @verbose
    print("# ")
    puts(*args)
  end
end

#filter_channels(filters = {}) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/nchan_tools/rdsck.rb', line 194

def filter_channels(filters={})
  script = <<~EOF
    local prev_cursor = ARGV[1]
    local pattern = ARGV[2]
    local scan_batch_size = ARGV[3]
    
    local min_subscribers = ARGV[4] and #ARGV[4] > 0 and tonumber(ARGV[4])
    
    local cursor, iteration 
    if pattern and #pattern > 0 then
      cursor, iteration = unpack(redis.call("SCAN", prev_cursor, "MATCH", pattern, "COUNT", scan_batch_size))
    else
      cursor, iteration = unpack(redis.call("SCAN", prev_cursor, "COUNT", scan_batch_size))
    end
    
    local matched = {}
    for _, chankey in pairs(iteration) do
      local match = true
      if min_subscribers then
        match = match and (tonumber(redis.call('HGET', chankey, 'fake_subscribers') or 0) >= min_subscribers)
      end
      if match then
        table.insert(matched, chankey)
      end
    end
    
    return {cursor, matched}
  EOF
  
  results = []
  batch_size=500
  @masters.each do |m|
    hash = m.script "load", script
    cursor, pattern = "0", "{channel:*}"
    loop do
      cursor, batch_results = m.evalsha hash, keys: [], argv: [cursor, pattern, batch_size, filters[:min_subscribers]]
      results += batch_results
      pattern = ""
      break if cursor.to_i == 0
    end
  end
  results
  
  results.map! do |key|
    m = key.match(/^\{channel\:(.*)\}$/)
    m[1] || key
  end
end

#infoObject



103
104
105
106
107
108
# File 'lib/nchan_tools/rdsck.rb', line 103

def info
  channel_hash=@redis.hgetall key
  hash_ttl=@redis.ttl key
  channel_subs=@redis.hgetall key("subscribers")
  #...
end

#key(subkey = nil) ⇒ Object



98
99
100
101
# File 'lib/nchan_tools/rdsck.rb', line 98

def key(subkey=nil)
  k = "{channel:#{@namespace}/#{@channel_id}}"
  return subkey ? "#{k}:#{subkey}" : k
end

#watch_channels(filters = {}, set_notify_keyspace_events = nil) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/nchan_tools/rdsck.rb', line 171

def watch_channels(filters={}, set_notify_keyspace_events=nil)
  watchers = []
  @masters.each do |m|
    watchers << Watch.new(self, m, filters, set_notify_keyspace_events)
  end


  begin
    Async do |task|
      watchers.each do |watcher|
        watcher.watch(task)
      end
    end
  rescue Interrupt => e
    dbg "stopping watch"
  ensure
    watchers.each do |watcher|
      watcher.stop
    end
  end
  
end