Class: Mongo::MongoShardedClient

Inherits:
MongoReplicaSetClient show all
Includes:
ThreadLocalVariableManager
Defined in:
lib/mongo/mongo_sharded_client.rb

Overview

Instantiates and manages connections to a MongoDB sharded cluster for high availability.

Direct Known Subclasses

ShardedConnection

Constant Summary collapse

SHARDED_CLUSTER_OPTS =
[:refresh_mode, :refresh_interval, :tag_sets, :read]

Constants inherited from MongoReplicaSetClient

Mongo::MongoReplicaSetClient::REPL_SET_OPTS

Constants inherited from MongoClient

Mongo::MongoClient::CLIENT_ONLY_OPTS, Mongo::MongoClient::ConditionVariable, Mongo::MongoClient::DEFAULT_DB_NAME, Mongo::MongoClient::DEFAULT_HOST, Mongo::MongoClient::DEFAULT_PORT, Mongo::MongoClient::GENERIC_OPTS, Mongo::MongoClient::Mutex, Mongo::MongoClient::POOL_OPTS, Mongo::MongoClient::READ_PREFERENCE_OPTS, Mongo::MongoClient::TIMEOUT_OPTS, Mongo::MongoClient::WRITE_CONCERN_OPTS

Constants included from Networking

Networking::RESPONSE_HEADER_SIZE, Networking::STANDARD_HEADER_SIZE

Instance Attribute Summary collapse

Attributes inherited from MongoReplicaSetClient

#replica_set_name

Attributes inherited from MongoClient

#acceptable_latency, #auths, #connect_timeout, #host_to_try, #logger, #op_timeout, #pool_size, #pool_timeout, #primary, #primary_pool, #read, #size, #socket_class, #tag_sets, #write_concern

Attributes included from WriteConcern

#legacy_write_concern

Instance Method Summary collapse

Methods included from ThreadLocalVariableManager

#thread_local

Methods inherited from MongoReplicaSetClient

#arbiters, #authenticate_pools, #checkin, #checkout_reader, #checkout_writer, #close, #connecting?, #ensure_manager, #get_socket_from_pool, #host, #hosts, #local_manager, #logout_pools, #max_bson_size, #nodes, #pin_pool, #port, #primary, #primary_pool, #read_pool, #read_primary?, #refresh, #reset_connection, #secondaries, #secondary_pool, #secondary_pools, #tag_map, #unpin_pool

Methods inherited from MongoClient

#[], #active?, #add_auth, #apply_saved_authentication, #authenticate_pools, #checkin, #checkout_reader, #checkout_writer, #clear_auths, #close, #copy_database, #database_info, #database_names, #db, #drop_database, from_uri, #host, #host_port, #lock!, #locked?, #logout_pools, #max_bson_size, #mongos?, multi, #parse_init, #pin_pool, #ping, #port, #primary?, #read_pool, #read_primary?, #reconnect, #refresh, #remove_auth, #server_info, #server_version, #unlock!, #unpin_pool

Methods included from WriteConcern

#get_write_concern, gle?, #write_concern_from_legacy

Methods included from Networking

#receive_message, #send_message, #send_message_with_gle

Methods included from Logging

#instrument, instrumenter, instrumenter=, #log, #write_logging_startup_message

Constructor Details

#initialize(*args) ⇒ MongoShardedClient

Returns a new instance of MongoShardedClient.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/mongo/mongo_sharded_client.rb', line 30

def initialize(*args)
  opts = args.last.is_a?(Hash) ? args.pop : {}

  nodes = args.flatten

  if nodes.empty? and ENV.has_key?('MONGODB_URI')
    parser = URIParser.new ENV['MONGODB_URI']
    if parser.direct?
      raise MongoArgumentError, "Mongo::MongoShardedClient.new called with no arguments, but ENV['MONGODB_URI'] implies a direct connection."
    end
    opts = parser.connection_options.merge! opts
    nodes = [parser.nodes]
  end

  unless nodes.length > 0
    raise MongoArgumentError, "A MongoShardedClient requires at least one seed node."
  end

  @seeds = nodes.map do |host_port|
    host, port = host_port.split(":")
    [ host, port.to_i ]
  end

  # TODO: add a method for replacing this list of node.
  @seeds.freeze

  # Refresh
  @last_refresh = Time.now
  @refresh_version = 0

  # No connection manager by default.
  @manager = nil
  @old_managers = []

  # Lock for request ids.
  @id_lock = Mutex.new

  @pool_mutex = Mutex.new
  @connected = false

  @safe_mutex_lock = Mutex.new
  @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

  @connect_mutex = Mutex.new
  @refresh_mutex = Mutex.new

  @mongos        = true

  check_opts(opts)
  setup(opts)
end

Instance Attribute Details

#managerObject (readonly)

Returns the value of attribute manager.



27
28
29
# File 'lib/mongo/mongo_sharded_client.rb', line 27

def manager
  @manager
end

#refresh_intervalObject (readonly)

Returns the value of attribute refresh_interval.



27
28
29
# File 'lib/mongo/mongo_sharded_client.rb', line 27

def refresh_interval
  @refresh_interval
end

#refresh_modeObject (readonly)

Returns the value of attribute refresh_mode.



27
28
29
# File 'lib/mongo/mongo_sharded_client.rb', line 27

def refresh_mode
  @refresh_mode
end

#refresh_versionObject (readonly)

Returns the value of attribute refresh_version.



27
28
29
# File 'lib/mongo/mongo_sharded_client.rb', line 27

def refresh_version
  @refresh_version
end

#seedsObject (readonly)

Returns the value of attribute seeds.



27
28
29
# File 'lib/mongo/mongo_sharded_client.rb', line 27

def seeds
  @seeds
end

Instance Method Details

#checkout(&block) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/mongo/mongo_sharded_client.rb', line 136

def checkout(&block)
  2.times do
    if connected?
      sync_refresh
    else
      connect
    end

    begin
      socket = block.call
    rescue => ex
      checkin(socket) if socket
      raise ex
    end

    if socket
      return socket
    else
      @connected = false
      #raise ConnectionFailure.new("Could not checkout a socket.")
    end
  end
end

#connect(force = !@connected)) ⇒ Object

Initiate a connection to the sharded cluster.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/mongo/mongo_sharded_client.rb', line 92

def connect(force = !@connected)
  return unless force
  log(:info, "Connecting...")
  @connect_mutex.synchronize do
    discovered_seeds = @manager ? @manager.seeds : []
    @old_managers << @manager if @manager
    @manager = ShardingPoolManager.new(self, discovered_seeds | @seeds)

    thread_local[:managers][self] = @manager

    @manager.connect
    @refresh_version += 1
    @last_refresh = Time.now
    @connected = true
  end
end

#connected?Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/mongo/mongo_sharded_client.rb', line 121

def connected?
  @connected && @manager.primary_pool
end

#hard_refresh!Boolean

Force a hard refresh of this connection’s view of the sharded cluster.

Returns:

  • (Boolean)

    true if hard refresh occurred. false is returned when unable to get the refresh lock.



115
116
117
118
119
# File 'lib/mongo/mongo_sharded_client.rb', line 115

def hard_refresh!
  log(:info, "Initiating hard refresh...")
  connect(true)
  return true
end

#inspectObject



86
87
88
89
# File 'lib/mongo/mongo_sharded_client.rb', line 86

def inspect
  "<Mongo::MongoShardedClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
      "@connected=#{@connected}>"
end

#slave_ok?Boolean

Returns true if it’s okay to read from a secondary node. Since this is a sharded cluster, this must always be false.

This method exist primarily so that Cursor objects will generate query messages with a slaveOkay value of true.

Returns:

  • (Boolean)

    true



132
133
134
# File 'lib/mongo/mongo_sharded_client.rb', line 132

def slave_ok?
  false
end

#valid_optsObject



82
83
84
# File 'lib/mongo/mongo_sharded_client.rb', line 82

def valid_opts
  GENERIC_OPTS + SHARDED_CLUSTER_OPTS
end