Class: ActiveRecord::Bogacs::ShareablePool

Inherits:
ConnectionAdapters::ConnectionPool
  • Object
show all
Includes:
PoolSupport, ThreadSafe::CheapLockable
Defined in:
lib/active_record/bogacs/shareable_pool.rb

Overview

NOTE: maybe do not override?!

Constant Summary collapse

AtomicReference =
ThreadSafe::AtomicReference
DEFAULT_SHARED_POOL =

only allow 25% of the pool size to be shared

0.25
MAX_THREAD_SHARING =

not really a strict limit but should hold

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PoolSupport

#current_connection_id, #new_connection, #reap

Constructor Details

#initialize(spec) ⇒ ShareablePool

Returns a new instance of ShareablePool.



34
35
36
37
38
39
40
41
42
# File 'lib/active_record/bogacs/shareable_pool.rb', line 34

def initialize(spec)
  super(spec) # ConnectionPool#initialize
  shared_size = spec.config[:shared_pool]
  shared_size = shared_size ? shared_size.to_f : DEFAULT_SHARED_POOL
  # size 0.0 - 1.0 assumes percentage of the pool size
  shared_size = ( @size * shared_size ).round if shared_size <= 1.0
  @shared_size = shared_size.to_i
  @shared_connections = ThreadSafe::Map.new # :initial_capacity => @shared_size, :concurrency_level => 20
end

Instance Attribute Details

#shared_sizeObject (readonly)

Returns the value of attribute shared_size.



31
32
33
# File 'lib/active_record/bogacs/shareable_pool.rb', line 31

def shared_size
  @shared_size
end

Instance Method Details

#active_connection?Boolean

Returns:

  • (Boolean)


52
53
54
55
56
57
# File 'lib/active_record/bogacs/shareable_pool.rb', line 52

def active_connection?
  if shared_conn = Thread.current[shared_connection_key]
    return shared_conn.in_use?
  end
  super_active_connection? current_connection_id
end

#clear_reloadable_connections!Object



82
83
84
# File 'lib/active_record/bogacs/shareable_pool.rb', line 82

def clear_reloadable_connections!
  cheap_synchronize { @shared_connections.clear; super }
end

#connectionObject



45
46
47
48
49
# File 'lib/active_record/bogacs/shareable_pool.rb', line 45

def connection
  Thread.current[shared_connection_key] || begin # super - simplified :
    super # @reserved_connections.compute_if_absent(current_connection_id) { checkout }
  end
end

#disconnect!Object



77
78
79
# File 'lib/active_record/bogacs/shareable_pool.rb', line 77

def disconnect!
  cheap_synchronize { @shared_connections.clear; super }
end

#release_connection(with_id = current_connection_id) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/active_record/bogacs/shareable_pool.rb', line 60

def release_connection(with_id = current_connection_id)
  if reserved_conn = @reserved_connections[with_id]
    if shared_count = @shared_connections[reserved_conn]
      cheap_synchronize do # lock due #get_shared_connection ... not needed ?!
        # NOTE: the other option is to not care about shared here at all ...
        if shared_count.get == 0 # releasing a shared connection
          release_shared_connection(reserved_conn)
        #else return false
        end
      end
    else # check back-in non-shared connections
      checkin reserved_conn # (what super does)
    end
  end
end

#release_shared_connection(connection) ⇒ Object

Custom API :



108
109
110
111
112
113
114
115
116
# File 'lib/active_record/bogacs/shareable_pool.rb', line 108

def release_shared_connection(connection)
  shared_conn_key = shared_connection_key
  if connection == Thread.current[shared_conn_key]
    Thread.current[shared_conn_key] = nil
  end

  @shared_connections.delete(connection)
  checkin connection
end

#remove(conn) ⇒ Object

Note:

called from #reap thus the pool should work with reaper



88
89
90
# File 'lib/active_record/bogacs/shareable_pool.rb', line 88

def remove(conn)
  cheap_synchronize { @shared_connections.delete(conn); super }
end

#super_active_connection?(connection_id = current_connection_id) ⇒ Boolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/active_record/bogacs/shareable_pool.rb', line 166

def super_active_connection?(connection_id = current_connection_id)
  (@reserved_connections.get(connection_id) || ( return false )).in_use?
end

#with_shared_connectionObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/active_record/bogacs/shareable_pool.rb', line 118

def with_shared_connection
  shared_conn_key = shared_connection_key
  # with_shared_connection call nested in the same thread
  if connection = Thread.current[shared_conn_key]
    emulated_checkout(connection)
    return yield connection
  end

  start = Time.now if DEBUG
  begin
    connection_id = current_connection_id
    # if there's a 'regular' connection on the thread use it as super
    if super_active_connection?(connection_id) # for current thread
      connection = self.connection # do not mark as shared
      DEBUG && debug("with_shared_conn 10 got active = #{connection.to_s}")
    # otherwise if we have a shared connection - use that one :
    elsif connection = get_shared_connection
      emulated_checkout(connection); shared = true
      DEBUG && debug("with_shared_conn 20 got shared = #{connection.to_s}")
    else
      cheap_synchronize do
        # check shared again as/if threads end up sync-ing up here :
        if connection = get_shared_connection
          emulated_checkout(connection)
          DEBUG && debug("with_shared_conn 21 got shared = #{connection.to_s}")
        end # here we acquire but a connection from the pool
        # TODO the bottle-neck for concurrency doing sync { checkout } :
        unless connection # here we acquire a connection from the pool
          connection = self.checkout # might block if pool fully used
          add_shared_connection(connection)
          DEBUG && debug("with_shared_conn 30 acq shared = #{connection.to_s}")
        end
      end
      shared = true
    end

    Thread.current[shared_conn_key] = connection if shared

    DEBUG && debug("with_shared_conn obtaining a connection took #{(Time.now - start) * 1000}ms")
    yield connection
  ensure
    Thread.current[shared_conn_key] = nil # if shared
    rem_shared_connection(connection) if shared
  end
end