Module: Mongo::ReadPreference

Included in:
Cursor, MongoReplicaSetClient
Defined in:
lib/mongo/util/read_preference.rb

Constant Summary collapse

READ_PREFERENCES =
[
  :primary,
  :primary_preferred,
  :secondary,
  :secondary_preferred,
  :nearest
]
MONGOS_MODES =
{
  :primary              => :primary,
  :primary_preferred    => :primaryPreferred,
  :secondary            => :secondary,
  :secondary_preferred  => :secondaryPreferred,
  :nearest              => :nearest
}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.mongos(mode, tag_sets) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/mongo/util/read_preference.rb', line 19

def self.mongos(mode, tag_sets)
  if mode != :secondary_preferred || !tag_sets.empty?
    mongos_read_preference = BSON::OrderedHash[:mode => MONGOS_MODES[mode]]
    mongos_read_preference[:tags] = tag_sets if !tag_sets.empty?
  end
  mongos_read_preference
end

.validate(value) ⇒ Object



27
28
29
30
31
32
33
34
# File 'lib/mongo/util/read_preference.rb', line 27

def self.validate(value)
  if READ_PREFERENCES.include?(value)
    return true
  else
    raise MongoArgumentError, "#{value} is not a valid read preference. " +
      "Please specify one of the following read preferences as a symbol: #{READ_PREFERENCES}"
  end
end

Instance Method Details

#read_pool(read_preference_override = {}) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/mongo/util/read_preference.rb', line 44

def read_pool(read_preference_override={})
  return primary_pool if mongos?

  read_pref = read_preference.merge(read_preference_override)

  if pinned_pool && pinned_pool[:read_preference] == read_pref
    pool = pinned_pool[:pool]
  else
    unpin_pool
    pool = select_pool(read_pref)
  end

  unless pool
    raise ConnectionFailure, "No replica set member available for query " +
      "with read preference matching mode #{read_pref[:mode]} and tags " +
      "matching #{read_pref[:tags]}."
  end

  pool
end

#read_preferenceObject



36
37
38
39
40
41
42
# File 'lib/mongo/util/read_preference.rb', line 36

def read_preference
  {
    :mode => @read,
    :tags => @tag_sets,
    :latency => @acceptable_latency
  }
end

#select_near_pool(candidates, read_pref) ⇒ Object



103
104
105
106
107
108
109
110
# File 'lib/mongo/util/read_preference.rb', line 103

def select_near_pool(candidates, read_pref)
  latency = read_pref[:latency]
  nearest_pool = candidates.min_by { |candidate| candidate.ping_time }
  near_pools = candidates.select do |candidate|
    (candidate.ping_time - nearest_pool.ping_time) <= latency
  end
  near_pools[ rand(near_pools.length) ]
end

#select_pool(read_pref) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/mongo/util/read_preference.rb', line 65

def select_pool(read_pref)
  if read_pref[:mode] == :primary && !read_pref[:tags].empty?
    raise MongoArgumentError, "Read preference :primary cannot be combined with tags"
  end

  case read_pref[:mode]
    when :primary
      primary_pool
    when :primary_preferred
      primary_pool || select_secondary_pool(secondary_pools, read_pref)
    when :secondary
      select_secondary_pool(secondary_pools, read_pref)
    when :secondary_preferred
      select_secondary_pool(secondary_pools, read_pref) || primary_pool
    when :nearest
      select_secondary_pool(pools, read_pref)
  end
end

#select_secondary_pool(candidates, read_pref) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/mongo/util/read_preference.rb', line 84

def select_secondary_pool(candidates, read_pref)
  tag_sets = read_pref[:tags]

  if !tag_sets.empty?
    matches = []
    tag_sets.detect do |tag_set|
      matches = candidates.select do |candidate|
        tag_set.none? { |k,v| candidate.tags[k.to_s] != v } &&
        candidate.ping_time
      end
      !matches.empty?
    end
  else
    matches = candidates
  end

  matches.empty? ? nil : select_near_pool(matches, read_pref)
end