3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/distribute_reads/global_methods.rb', line 3
def distribute_reads(**options)
raise ArgumentError, "Missing block" unless block_given?
unknown_keywords = options.keys - [:failover, :lag_failover, :lag_on, :max_lag, :primary, :replica]
raise ArgumentError, "Unknown keywords: #{unknown_keywords.join(", ")}" if unknown_keywords.any?
options = DistributeReads.default_options.merge(options)
previous_value = Thread.current[:distribute_reads]
begin
Thread.current[:distribute_reads] = {
failover: options[:failover],
primary: options[:primary],
replica: options[:replica]
}
max_lag = options[:max_lag]
if max_lag && !options[:primary]
Array(options[:lag_on] || [ActiveRecord::Base]).each do |base_model|
if DistributeReads.lag(connection: base_model.connection) > max_lag
if options[:lag_failover]
Thread.current[:distribute_reads][:primary] = true
Thread.current[:distribute_reads][:replica] = false
break
else
raise DistributeReads::TooMuchLag, "Replica lag over #{max_lag} seconds#{options[:lag_on] ? " on #{base_model.name} connection" : ""}"
end
end
end
end
value = yield
warn "[distribute_reads] Call `to_a` inside block to execute query on replica" if value.is_a?(ActiveRecord::Relation) && !previous_value
value
ensure
Thread.current[:distribute_reads] = previous_value
end
end
|