Class: WalShipper::Dispatcher
- Includes:
- WalShipper
- Defined in:
- lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb
Overview
Class for creating new Destination objects and determining how to ship WAL files to them.
Instance Method Summary collapse
-
#clean_spool ⇒ Object
Remove any WAL segments no longer needed by slaves.
-
#dispatch ⇒ Object
Decide to be synchronous or threaded, and delegate each destination to the proper ship method.
-
#initialize(wal, conf) ⇒ Dispatcher
constructor
Create a new Shipper object, given a
conf
hash and awal
file Pathname object. -
#link ⇒ Object
Create hardlinks for the WAL file into each of the destination directories for separate queueing and recording of what was shipped successfully.
Methods included from WalShipper
Constructor Details
#initialize(wal, conf) ⇒ Dispatcher
Create a new Shipper object, given a conf
hash and a wal
file Pathname object.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 125 def initialize( wal, conf ) # Make the config keys instance variables. conf.each_pair {|key, val| self.instance_variable_set( "@#{key}", val ) } # Spool directory check. # @spool = Pathname.new( @spool ) @spool.exist? or raise "The configured spool directory (%s) doesn't exist." % [ @spool ] # Stop right away if we have disabled shipping. # unless @enabled self.log "WAL shipping is disabled, queuing segment %s" % [ wal.basename ] exit 1 end # Instantiate Destination objects, creating new spool directories # for each. # @destinations. collect!{|dest| WalShipper::Destination.new( dest, @debug ) }. reject {|dest| dest.invalid }. collect do |dest| dest.spool = @spool + dest.label dest.spool.mkdir( 0711 ) unless dest.spool.exist? dest end # Put the WAL file into the spool for processing! # @waldir = @spool + 'wal_segments' @waldir.mkdir( 0711 ) unless @waldir.exist? self.log "Copying %s to %s" % [ wal.basename, @waldir ] FileUtils::cp wal, @waldir # 'wal' now references the copy. The original is managed and auto-expired # by PostgreSQL when a new checkpoint segment it reached. @wal = @waldir + wal.basename end |
Instance Method Details
#clean_spool ⇒ Object
Remove any WAL segments no longer needed by slaves.
234 235 236 237 238 239 240 241 242 243 |
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 234 def clean_spool total = 0 @waldir.children.each do |wal| if wal.stat.nlink == 1 total += wal.unlink end end self.log "Removed %d WAL segment%s." % [ total, total == 1 ? '' : 's' ] end |
#dispatch ⇒ Object
Decide to be synchronous or threaded, and delegate each destination to the proper ship method.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 181 def dispatch # Synchronous mode. # unless @async self.log "Performing a synchronous dispatch." @destinations.each {|dest| self.dispatch_dest( dest ) } return end tg = ThreadGroup.new # Async, one thread per destination # if @async_max.nil? || @async_max.to_i.zero? self.log "Performing an asynchronous dispatch: one thread per destination." @destinations.each do |dest| t = Thread.new do Thread.current.abort_on_exception = true self.dispatch_dest( dest ) end tg.add( t ) end tg.list.each {|t| t.join } return end # Async, one thread per destination, in groups of asynx_max size. # self.log "Performing an asynchronous dispatch: one thread per destination, %d at a time." % [ @async_max ] all_dests = @destinations.dup dest_chunks = [] until all_dests.empty? do dest_chunks << all_dests.slice!( 0, @async_max ) end dest_chunks.each do |chunk| chunk.each do |dest| t = Thread.new do Thread.current.abort_on_exception = true self.dispatch_dest( dest ) end tg.add( t ) end tg.list.each {|t| t.join } end return end |
#link ⇒ Object
Create hardlinks for the WAL file into each of the destination directories for separate queueing and recording of what was shipped successfully.
170 171 172 173 174 175 |
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 170 def link @destinations.each do |dest| self.log "Linking %s into %s" % [ @wal.basename, dest.spool.basename ] FileUtils::ln @wal, dest.spool, :force => true end end |