Class: WalShipper::Dispatcher

Inherits:
Object
  • Object
show all
Includes:
WalShipper
Defined in:
lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb

Overview

Class for creating new Destination objects and determining how to ship WAL files to them.

Instance Method Summary collapse

Methods included from WalShipper

#log

Constructor Details

#initialize(wal, conf) ⇒ Dispatcher

Create a new Shipper object, given a conf hash and a wal file Pathname object.



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 125

def initialize( wal, conf )
	# Make the config keys instance variables.
	conf.each_pair {|key, val| self.instance_variable_set( "@#{key}", val ) }

	# Spool directory check.
	#
	@spool = Pathname.new( @spool )
	@spool.exist? or raise "The configured spool directory (%s) doesn't exist." % [ @spool ]

	# Stop right away if we have disabled shipping.
	#
	unless @enabled
		self.log "WAL shipping is disabled, queuing segment %s" % [ wal.basename ]
		exit 1
	end

	# Instantiate Destination objects, creating new spool directories
	# for each.
	#
	@destinations.
		collect!{|dest| WalShipper::Destination.new( dest, @debug ) }.
		reject  {|dest| dest.invalid }.
		collect do |dest|
			dest.spool = @spool + dest.label
			dest.spool.mkdir( 0711 ) unless dest.spool.exist?
			dest
		end

	# Put the WAL file into the spool for processing!
	#
	@waldir = @spool + 'wal_segments'
	@waldir.mkdir( 0711 ) unless @waldir.exist?

	self.log "Copying %s to %s" % [ wal.basename, @waldir ]
	FileUtils::cp wal, @waldir

	# 'wal' now references the copy.  The original is managed and auto-expired
	# by PostgreSQL when a new checkpoint segment it reached.
	@wal = @waldir + wal.basename
end

Instance Method Details

#clean_spoolObject

Remove any WAL segments no longer needed by slaves.



234
235
236
237
238
239
240
241
242
243
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 234

def clean_spool
	total = 0
	@waldir.children.each do |wal|
		if wal.stat.nlink == 1
			total += wal.unlink
		end
	end

	self.log "Removed %d WAL segment%s." % [ total, total == 1 ? '' : 's' ]
end

#dispatchObject

Decide to be synchronous or threaded, and delegate each destination to the proper ship method.



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 181

def dispatch
	# Synchronous mode.
	#
	unless @async
		self.log "Performing a synchronous dispatch."
		@destinations.each {|dest| self.dispatch_dest( dest ) }
		return
	end

	tg = ThreadGroup.new

	# Async, one thread per destination
	#
	if @async_max.nil? || @async_max.to_i.zero?
		self.log "Performing an asynchronous dispatch: one thread per destination."
		@destinations.each do |dest|
			t = Thread.new do
				Thread.current.abort_on_exception = true
				self.dispatch_dest( dest )
			end
			tg.add( t )
		end
		tg.list.each {|t| t.join }
		return
	end

	# Async, one thread per destination, in groups of asynx_max size.
	#
	self.log "Performing an asynchronous dispatch: one thread per destination, %d at a time." % [ @async_max ]
	all_dests = @destinations.dup
	dest_chunks = []
	until all_dests.empty? do
		dest_chunks << all_dests.slice!( 0, @async_max )
	end

	dest_chunks.each do |chunk|
		chunk.each do |dest|
			t = Thread.new do
				Thread.current.abort_on_exception = true
				self.dispatch_dest( dest )
			end
			tg.add( t )
		end

		tg.list.each {|t| t.join }
	end

	return
end

Create hardlinks for the WAL file into each of the destination directories for separate queueing and recording of what was shipped successfully.



170
171
172
173
174
175
# File 'lib/rubypitaya/app-template/vendor/bundle/ruby/3.1.0/gems/pg-1.4.5/sample/wal_shipper.rb', line 170

def link
	@destinations.each do |dest|
		self.log "Linking %s into %s" % [ @wal.basename, dest.spool.basename ]
		FileUtils::ln @wal, dest.spool, :force => true
	end
end