Class: Google::Cloud::Spanner::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/spanner/pool.rb

Overview

# Pool

Implements a pool for managing and reusing Session instances.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, min: 10, max: 100, keepalive: 1800, write_ratio: 0.3, fail: true, threads: nil) ⇒ Pool

Returns a new instance of Pool.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/google/cloud/spanner/pool.rb', line 35

def initialize client, min: 10, max: 100, keepalive: 1800,
               write_ratio: 0.3, fail: true, threads: nil
  @client = client
  @min = min
  @max = max
  @keepalive = keepalive
  @write_ratio = write_ratio
  @write_ratio = 0 if write_ratio < 0
  @write_ratio = 1 if write_ratio > 1
  @fail = fail
  @threads = threads || [2, Concurrent.processor_count * 2].max

  @mutex = Mutex.new
  @resource = ConditionVariable.new

  # initialize pool and availability queue
  init
end

Instance Attribute Details

#all_sessionsObject

Returns the value of attribute all_sessions.



33
34
35
# File 'lib/google/cloud/spanner/pool.rb', line 33

def all_sessions
  @all_sessions
end

#session_queueObject

Returns the value of attribute session_queue.



33
34
35
# File 'lib/google/cloud/spanner/pool.rb', line 33

def session_queue
  @session_queue
end

#transaction_queueObject

Returns the value of attribute transaction_queue.



33
34
35
# File 'lib/google/cloud/spanner/pool.rb', line 33

def transaction_queue
  @transaction_queue
end

Instance Method Details

#checkin_session(session) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/google/cloud/spanner/pool.rb', line 89

def checkin_session session
  @mutex.synchronize do
    unless all_sessions.include? session
      fail ArgumentError, "Cannot checkin session"
    end

    session_queue.push session

    @resource.signal
  end

  nil
end

#checkin_transaction(tx) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/google/cloud/spanner/pool.rb', line 147

def checkin_transaction tx
  @mutex.synchronize do
    unless all_sessions.include? tx.session
      fail ArgumentError, "Cannot checkin session"
    end

    transaction_queue.push tx

    @resource.signal
  end

  nil
end

#checkout_sessionObject



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/google/cloud/spanner/pool.rb', line 63

def checkout_session
  action = nil
  @mutex.synchronize do
    loop do
      fail ClientClosedError if @closed

      read_session = session_queue.shift
      return read_session if read_session
      write_transaction = transaction_queue.shift
      return write_transaction.session if write_transaction

      if can_allocate_more_sessions?
        @new_sessions_in_process += 1
        action = :new
        break
      end

      fail SessionLimitError if @fail

      @resource.wait @mutex
    end
  end

  return new_session! if action == :new
end

#checkout_transactionObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/google/cloud/spanner/pool.rb', line 116

def checkout_transaction
  action = nil
  @mutex.synchronize do
    loop do
      fail ClientClosedError if @closed

      write_transaction = transaction_queue.shift
      return write_transaction if write_transaction
      read_session = session_queue.shift
      if read_session
        action = read_session
        break
      end

      if can_allocate_more_sessions?
        @new_sessions_in_process += 1
        action = :new
        break
      end

      fail SessionLimitError if @fail

      @resource.wait @mutex
    end
  end
  if action.is_a? Google::Cloud::Spanner::Session
    return action.create_transaction
  end
  return new_transaction! if action == :new
end

#closeObject



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/google/cloud/spanner/pool.rb', line 168

def close
  @mutex.synchronize do
    @closed = true
  end
  @keepalive_task.shutdown
  # Unblock all waiting threads
  @resource.broadcast
  # Delete all sessions
  @mutex.synchronize do
    @all_sessions.each { |s| future { s.release! } }
    @all_sessions = []
    @session_queue = []
    @transaction_queue = []
  end
  # shutdown existing thread pool
  @thread_pool.shutdown

  true
end

#keepalive_or_release!Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/google/cloud/spanner/pool.rb', line 188

def keepalive_or_release!
  to_keepalive = []
  to_release = []

  @mutex.synchronize do
    available_count = session_queue.count + transaction_queue.count
    release_count = @min - available_count
    release_count = 0 if release_count < 0

    to_keepalive += (session_queue + transaction_queue).select do |x|
      x.idle_since? @keepalive
    end

    # Remove a random portion of the sessions and transactions
    to_release = to_keepalive.sample release_count
    to_keepalive -= to_release

    # Remove those to be released from circulation
    @all_sessions -= to_release.map(&:session)
    @session_queue -= to_release
    @transaction_queue -= to_release
  end

  to_release.each { |x| future { x.release! } }
  to_keepalive.each { |x| future { x.keepalive! } }
end

#resetObject



161
162
163
164
165
166
# File 'lib/google/cloud/spanner/pool.rb', line 161

def reset
  close
  init

  true
end

#with_sessionObject



54
55
56
57
58
59
60
61
# File 'lib/google/cloud/spanner/pool.rb', line 54

def with_session
  session = checkout_session
  begin
    yield session
  ensure
    checkin_session session
  end
end

#with_transactionObject



103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/google/cloud/spanner/pool.rb', line 103

def with_transaction
  tx = checkout_transaction
  begin
    yield tx
  ensure
    future do
      # Create and checkin a new transaction
      tx = tx.session.create_transaction
      checkin_transaction tx
    end
  end
end