Module: DistributeReads::GlobalMethods

Defined in:
lib/distribute_reads/global_methods.rb

Instance Method Summary collapse

Instance Method Details

#distribute_reads(**options) ⇒ Object

Raises:

  • (ArgumentError)


3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/distribute_reads/global_methods.rb', line 3

def distribute_reads(**options)
  raise ArgumentError, "Missing block" unless block_given?

  unknown_keywords = options.keys - [:failover, :lag_failover, :lag_on, :max_lag, :primary, :replica]
  raise ArgumentError, "Unknown keywords: #{unknown_keywords.join(", ")}" if unknown_keywords.any?

  options = DistributeReads.default_options.merge(options)

  previous_value = Thread.current[:distribute_reads]
  begin
    Thread.current[:distribute_reads] = {
      failover: options[:failover],
      primary: options[:primary],
      replica: options[:replica]
    }

    # TODO ensure same connection is used to test lag and execute queries
    max_lag = options[:max_lag]
    if max_lag && !options[:primary]
      Array(options[:lag_on] || [ActiveRecord::Base]).each do |base_model|
        current_lag =
          begin
            DistributeReads.replication_lag(connection: base_model.connection)
          rescue DistributeReads::NoReplicasAvailable
            # TODO rescue more exceptions?
            false
          end

        if !current_lag || current_lag > max_lag
          message =
            if current_lag.nil?
              "Replication stopped"
            elsif !current_lag
              "No replicas available for lag check"
            else
              "Replica lag over #{max_lag} seconds"
            end

          message = "#{message} on #{base_model.name} connection" if options[:lag_on]

          if options[:lag_failover]
            # TODO possibly per connection
            Thread.current[:distribute_reads][:primary] = true
            Thread.current[:distribute_reads][:replica] = false
            DistributeReads.log "#{message}. Falling back to master pool."
            break
          else
            raise DistributeReads::TooMuchLag, message
          end
        end
      end
    end

    value = yield
    if value.is_a?(ActiveRecord::Relation) && !previous_value && !value.loaded?
      if DistributeReads.eager_load
        value = value.load
      else
        DistributeReads.log "Call `to_a` inside block to execute query on replica"
      end
    end
    value
  ensure
    Thread.current[:distribute_reads] = previous_value
  end
end