Module: Mongo::ReadPreference

Included in:
Cursor, MongoReplicaSetClient
Defined in:
lib/mongo/util/read_preference.rb

Constant Summary collapse

READ_PREFERENCES =
[
  :primary,
  :primary_preferred,
  :secondary,
  :secondary_preferred,
  :nearest
]
MONGOS_MODES =
{
  :primary              => 'primary',
  :primary_preferred    => 'primaryPreferred',
  :secondary            => 'secondary',
  :secondary_preferred  => 'secondaryPreferred',
  :nearest              => 'nearest'
}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.mongos(mode, tag_sets) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/mongo/util/read_preference.rb', line 33

def self.mongos(mode, tag_sets)
  if mode != :secondary_preferred || !tag_sets.empty?
    mongos_read_preference = BSON::OrderedHash[:mode => MONGOS_MODES[mode]]
    mongos_read_preference[:tags] = tag_sets if !tag_sets.empty?
  end
  mongos_read_preference
end

.validate(value) ⇒ Object



41
42
43
44
45
46
47
48
# File 'lib/mongo/util/read_preference.rb', line 41

def self.validate(value)
  if READ_PREFERENCES.include?(value)
    return true
  else
    raise MongoArgumentError, "#{value} is not a valid read preference. " +
      "Please specify one of the following read preferences as a symbol: #{READ_PREFERENCES}"
  end
end

Instance Method Details

#read_pool(read_preference_override = {}) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/mongo/util/read_preference.rb', line 58

def read_pool(read_preference_override={})
  return primary_pool if mongos?

  read_pref = read_preference.merge(read_preference_override)

  if pinned_pool && pinned_pool[:read_preference] == read_pref
    pool = pinned_pool[:pool]
  else
    unpin_pool
    pool = select_pool(read_pref)
  end

  unless pool
    raise ConnectionFailure, "No replica set member available for query " +
      "with read preference matching mode #{read_pref[:mode]} and tags " +
      "matching #{read_pref[:tags]}."
  end

  pool
end

#read_preferenceObject



50
51
52
53
54
55
56
# File 'lib/mongo/util/read_preference.rb', line 50

def read_preference
  {
    :mode => @read,
    :tags => @tag_sets,
    :latency => @acceptable_latency
  }
end

#select_near_pool(candidates, read_pref) ⇒ Object



117
118
119
120
121
122
123
124
# File 'lib/mongo/util/read_preference.rb', line 117

def select_near_pool(candidates, read_pref)
  latency = read_pref[:latency]
  nearest_pool = candidates.min_by { |candidate| candidate.ping_time }
  near_pools = candidates.select do |candidate|
    (candidate.ping_time - nearest_pool.ping_time) <= latency
  end
  near_pools[ rand(near_pools.length) ]
end

#select_pool(read_pref) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/mongo/util/read_preference.rb', line 79

def select_pool(read_pref)
  if read_pref[:mode] == :primary && !read_pref[:tags].empty?
    raise MongoArgumentError, "Read preference :primary cannot be combined with tags"
  end

  case read_pref[:mode]
    when :primary
      primary_pool
    when :primary_preferred
      primary_pool || select_secondary_pool(secondary_pools, read_pref)
    when :secondary
      select_secondary_pool(secondary_pools, read_pref)
    when :secondary_preferred
      select_secondary_pool(secondary_pools, read_pref) || primary_pool
    when :nearest
      select_near_pool(pools, read_pref)
  end
end

#select_secondary_pool(candidates, read_pref) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/mongo/util/read_preference.rb', line 98

def select_secondary_pool(candidates, read_pref)
  tag_sets = read_pref[:tags]

  if !tag_sets.empty?
    matches = []
    tag_sets.detect do |tag_set|
      matches = candidates.select do |candidate|
        tag_set.none? { |k,v| candidate.tags[k.to_s] != v } &&
        candidate.ping_time
      end
      !matches.empty?
    end
  else
    matches = candidates
  end

  matches.empty? ? nil : select_near_pool(matches, read_pref)
end