Class: Redis::Migrator

Inherits:
Object
  • Object
show all
Includes:
Helper
Defined in:
lib/redis_migrator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Helper

#parse_redis_url, #to_redis_proto

Constructor Details

#initialize(old_hosts, new_hosts) ⇒ Migrator

Returns a new instance of Migrator.



13
14
15
16
17
18
# File 'lib/redis_migrator.rb', line 13

def initialize(old_hosts, new_hosts)
  @old_hosts = old_hosts
  @new_hosts = new_hosts
  @old_cluster = Redis::Distributed.new(old_hosts)
  @new_cluster = Redis::Distributed.new(new_hosts)
end

Instance Attribute Details

#new_clusterObject

Returns the value of attribute new_cluster.



11
12
13
# File 'lib/redis_migrator.rb', line 11

def new_cluster
  @new_cluster
end

#new_hostsObject

Returns the value of attribute new_hosts.



11
12
13
# File 'lib/redis_migrator.rb', line 11

def new_hosts
  @new_hosts
end

#old_clusterObject

Returns the value of attribute old_cluster.



11
12
13
# File 'lib/redis_migrator.rb', line 11

def old_cluster
  @old_cluster
end

#old_hostsObject

Returns the value of attribute old_hosts.



11
12
13
# File 'lib/redis_migrator.rb', line 11

def old_hosts
  @old_hosts
end

Instance Method Details

#changed_keysObject

Finds redis keys for which migration is needed

Examples:

Returned value

{ "redis://host1.com" => ['key1', 'key2', 'key3'],
  "redis://host2.com => ['key4', 'key5', 'key6']" }

Returns:

  • a hash of keys grouped by node they need to be written to



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/redis_migrator.rb', line 29

def changed_keys
  keys = @old_cluster.keys("*")

  keys.inject({}) do |acc, key|
    old_node = @old_cluster.node_for(key).client
    new_node = @new_cluster.node_for(key).client

    if (old_node.host != new_node.host) || (old_node.port != new_node.port)
      hash_key = "redis://#{new_node.host}:#{new_node.port}/#{new_node.db}"
      acc[hash_key] = [] if acc[hash_key].nil?
      acc[hash_key] << key
    end

    acc
  end
end

#migrate_keys(node, keys, options = {}) ⇒ Object

Migrates a given array of keys to a given redis node

Parameters:

  • node (Hash)

    options for redis node keys will be migrated to

  • keys (Array)

    array of keys that need to be migrated

  • options (Hash) (defaults to: {})

    additional options, such as :do_not_remove => true



50
51
52
53
54
# File 'lib/redis_migrator.rb', line 50

def migrate_keys(node, keys, options={})
  return false if keys.empty? || keys.nil?

  migrator(options[:do_not_remove]).new(old_hosts).migrate(node, keys, options)
end

#redisObject



20
21
22
# File 'lib/redis_migrator.rb', line 20

def redis
  Thread.current[:redis]
end

#run(options = {}) ⇒ Object

Runs a migration process for a Redis cluster.

Parameters:

  • additional (Hash)

    options such as :do_not_remove => true



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/redis_migrator.rb', line 58

def run(options={})
  keys_to_migrate = changed_keys
  puts "Migrating #{keys_to_migrate.values.flatten.count} keys"
  threads = []

  keys_to_migrate.keys.each do |node_url|
    node = parse_redis_url(node_url)
    
    #spawn a separate thread for each Redis pipe
    threads << Thread.new(node, keys_to_migrate[node_url]) {|node, keys|
      migrate_keys(node, keys, options)
    }
  end

  threads.each{|t| t.join}
end