Class: Mongo::MongoShardedClient

Inherits:
MongoReplicaSetClient show all
Includes:
ThreadLocalVariableManager
Defined in:
lib/mongo/mongo_sharded_client.rb

Overview

Instantiates and manages connections to a MongoDB sharded cluster for high availability.

Direct Known Subclasses

ShardedConnection

Constant Summary collapse

SHARDED_CLUSTER_OPTS =
[:refresh_mode, :refresh_interval, :tag_sets, :read]

Constants inherited from MongoReplicaSetClient

Mongo::MongoReplicaSetClient::REPL_SET_OPTS

Constants included from ReadPreference

ReadPreference::MONGOS_MODES, ReadPreference::READ_PREFERENCES

Constants inherited from MongoClient

Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::READ_PREFERENCE_OPTS, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS

Constants included from Networking

Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Instance Attribute Summary collapse

Attributes inherited from MongoReplicaSetClient

#replica_set_name

Attributes inherited from MongoClient

#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #primary, #primary_pool, #read, #size, #socket_class, #tag_sets, #write_concern

Attributes included from WriteConcern

#legacy_write_concern

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ThreadLocalVariableManager

#thread_local

Methods inherited from MongoReplicaSetClient

#arbiters, #authenticate_pools, #checkin, #checkout_reader, #checkout_writer, #close, #connecting?, #ensure_manager, #get_socket_from_pool, #host, #hosts, #local_manager, #logout_pools, #max_bson_size, #max_message_size, #nodes, #pin_pool, #pinned_pool, #port, #primary, #primary_pool, #read_primary?, #refresh, #reset_connection, #secondaries, #secondary_pool, #secondary_pools, #tag_map, #unpin_pool

Methods included from ReadPreference

mongos, #read_pool, #read_preference, #select_near_pool, #select_pool, #select_secondary_pool, validate

Methods inherited from MongoClient

#[], #active?, #add_auth, #apply_saved_authentication, #authenticate_pools, #checkin, #checkout_reader, #checkout_writer, #clear_auths, #close, #copy_database, #database_info, #database_names, #db, #drop_database, #host, #host_port, #lock!, #locked?, #logout_pools, #max_bson_size, #max_message_size, #mongos?, multi, #parse_init, #pin_pool, #ping, #pinned_pool, #port, #read_pool, #read_primary?, #refresh, #remove_auth, #server_info, #server_version, #unlock!, #unpin_pool

Methods included from WriteConcern

#get_write_concern, gle?, #write_concern_from_legacy

Methods included from Networking

#receive_message, #send_message, #send_message_with_gle

Methods included from Logging

#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message

Constructor Details

#initialize(*args) ⇒ MongoShardedClient



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/mongo/mongo_sharded_client.rb', line 12

def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}

  nodes = args.flatten

  if nodes.empty? and ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    opts = parser.connection_options.merge! opts
    nodes = parser.node_strings
  end

  unless nodes.length > 0
    raise MongoArgumentError, "A MongoShardedClient requires at least one seed node."
  end

  @seeds = nodes.map do |host_port|
    host, port = host_port.split(":")
    [ host, port.to_i ]
  end

  # TODO: add a method for replacing this list of node.
  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil

  # Lock for request ids.
  @id_lock = Mutex.new

  @pool_mutex = Mutex.new
  @connected = false

  @safe_mutex_lock = Mutex.new
  @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

  @connect_mutex = Mutex.new
  @refresh_mutex = Mutex.new

  @mongos        = true

  check_opts(opts)
  setup(opts)
end

Instance Attribute Details

#managerObject (readonly)

Returns the value of attribute manager.



9
10
11
# File 'lib/mongo/mongo_sharded_client.rb', line 9

def manager
  @manager
end

#refresh_intervalObject (readonly)

Returns the value of attribute refresh_interval.



9
10
11
# File 'lib/mongo/mongo_sharded_client.rb', line 9

def refresh_interval
  @refresh_interval
end

#refresh_modeObject (readonly)

Returns the value of attribute refresh_mode.



9
10
11
# File 'lib/mongo/mongo_sharded_client.rb', line 9

def refresh_mode
  @refresh_mode
end

#refresh_versionObject (readonly)

Returns the value of attribute refresh_version.



9
10
11
# File 'lib/mongo/mongo_sharded_client.rb', line 9

def refresh_version
  @refresh_version
end

#seedsObject (readonly)

Returns the value of attribute seeds.



9
10
11
# File 'lib/mongo/mongo_sharded_client.rb', line 9

def seeds
  @seeds
end

Class Method Details

.from_uri(uri = , options = {}) ⇒ Mongo::MongoShardedClient

Initialize a connection to MongoDB using the MongoDB URI spec.



144
145
146
# File 'lib/mongo/mongo_sharded_client.rb', line 144

def self.from_uri(uri = ENV['MONGODB_URI'], options = {})
  URIParser.new(uri).connection(options, false, true)
end

Instance Method Details

#checkout(&block) ⇒ Object



126
127
128
129
130
131
132
133
134
# File 'lib/mongo/mongo_sharded_client.rb', line 126

def checkout(&block)
  tries = 0
  begin
    super(&block)
  rescue ConnectionFailure
    tries +=1
    tries < 2 ? retry : raise
  end
end

#connect(force = !@connected)) ⇒ Object

Initiate a connection to the sharded cluster.

Raises:



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/mongo/mongo_sharded_client.rb', line 70

def connect(force = !@connected)
  return unless force
  log(:info, "Connecting...")

  # Prevent recursive connection attempts from the same thread.
  # This is done rather than using a Monitor to prevent potentially recursing
  # infinitely while attempting to connect and continually failing. Instead, fail fast.
  raise ConnectionFailure, "Failed to get node data." if thread_local[:locks][:connecting]

  @connect_mutex.synchronize do
    begin
      thread_local[:locks][:connecting] = true
      if @manager
        @manager.refresh! @seeds
      else
        @manager = ShardingPoolManager.new(self, @seeds)
        thread_local[:managers][self] = @manager
        @manager.connect
      end
    ensure
      thread_local[:locks][:connecting] = false
    end

    @refresh_version += 1
    @last_refresh = Time.now
    @connected = true
  end
end

#connected?Boolean



111
112
113
# File 'lib/mongo/mongo_sharded_client.rb', line 111

def connected?
  @connected && @manager.primary_pool
end

#hard_refresh!Boolean

Force a hard refresh of this connection’s view of the sharded cluster.



105
106
107
108
109
# File 'lib/mongo/mongo_sharded_client.rb', line 105

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  connect(true)
  return true
end

#inspectObject



64
65
66
67
# File 'lib/mongo/mongo_sharded_client.rb', line 64

def inspect
  "<Mongo::MongoShardedClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
      "@connected=#{@connected}>"
end

#slave_ok?Boolean

Returns true if it’s okay to read from a secondary node. Since this is a sharded cluster, this must always be false.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.



122
123
124
# File 'lib/mongo/mongo_sharded_client.rb', line 122

def slave_ok?
  false
end

#valid_optsObject



60
61
62
# File 'lib/mongo/mongo_sharded_client.rb', line 60

def valid_opts
  GENERIC_OPTS + SHARDED_CLUSTER_OPTS + READ_PREFERENCE_OPTS + WRITE_CONCERN_OPTS
end