Class: Webhookdb::SyncTarget
- Inherits:
-
Object
- Object
- Webhookdb::SyncTarget
- Includes:
- Appydays::Configurable, Dbutil
- Defined in:
- lib/webhookdb/sync_target.rb
Overview
Support exporting WebhookDB data into external services, such as another Postgres instance or data warehouse (Snowflake, etc).
At a high level, the way sync targets work are:
-
User uses the CLI to register a sync target for a specific integration using a database connection string for a supported db (ie, postgres://).
-
They include a period (how often it is synced), and an optional schema and table (if not used, we’ll use the default schema, and the service integration table name).
-
Every minute or so, we look for sync targets that are “due” and enqueue a sync for them. Customers can enqueue their own sync request; but it cannot run more than the minimum allowed sync time.
For the sync logic itself, see run_sync
.
Defined Under Namespace
Classes: DatabaseRoutine, Deleted, HttpRoutine, InvalidConnection, Routine, SyncInProgress
Constant Summary collapse
- ADVISORY_LOCK_KEYSPACE =
Advisory locks for sync targets use this as the first int, and the id as the second.
2_000_000_000
- HTTP_VERIFY_TIMEOUT =
3
- DB_VERIFY_TIMEOUT =
2000
- DB_VERIFY_STATEMENT =
"SELECT 1"
- RAND =
Random.new
- MAX_STATS =
200
Constants included from Dbutil
Instance Attribute Summary collapse
Class Method Summary collapse
- .default_valid_period ⇒ Object
-
.transport_error?(e) ⇒ Boolean
Return true if the given error is considered a ‘transport’ error, like a timeout, socket error, dns error, etc.
- .valid_period(beginval) ⇒ Object
- .valid_period_for(org) ⇒ Object
- .validate_db_url(s) ⇒ Object
- .validate_http_url(s) ⇒ Object
- .verify_db_connection(url) ⇒ Object
- .verify_http_connection(url) ⇒ Object
Instance Method Summary collapse
-
#add_sync_stat(start, exception: nil, response_status: nil) ⇒ Object
:section: Stats.
- #advisory_lock(db) ⇒ Sequel::AdvisoryLock
- #associated_id ⇒ String
- #associated_object_display ⇒ Object
- #associated_type ⇒ String
- #before_create ⇒ Object
- #before_validation ⇒ Object
- #db? ⇒ Boolean
- #displaysafe_connection_url ⇒ Object
- #http? ⇒ Boolean
-
#jitter ⇒ Object
Return the jitter used for enqueing the next sync of the job.
- #latency(now: Time.now) ⇒ ActiveSupport::Duration, Integer
- #log_tags ⇒ Object
- #next_possible_sync(now:) ⇒ Object
- #next_scheduled_sync(now:) ⇒ Object
- #organization ⇒ Webhookdb::Organization
-
#run_sync(now:) ⇒ Object
Running a sync involves some work we always do (export, transform), and then work that varies per-adapter (load).
- #schema_and_table_string ⇒ String
- #sync_stat_summary ⇒ Object
Methods included from Dbutil
borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn
Instance Attribute Details
#connection_url ⇒ String
|
# File 'lib/webhookdb/sync_target.rb', line 383
|
#service_integration ⇒ Webhookdb::ServiceIntegration
|
# File 'lib/webhookdb/sync_target.rb', line 380
|
Class Method Details
.default_valid_period ⇒ Object
82 83 84 |
# File 'lib/webhookdb/sync_target.rb', line 82 def self.default_valid_period return self.valid_period(Webhookdb::SyncTarget.default_min_period_seconds) end |
.transport_error?(e) ⇒ Boolean
Return true if the given error is considered a ‘transport’ error, like a timeout, socket error, dns error, etc. This isn’t a consistent class type.
193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/webhookdb/sync_target.rb', line 193 def self.transport_error?(e) return true if e.is_a?(Timeout::Error) return true if e.is_a?(SocketError) return true if e.is_a?(OpenSSL::SSL::SSLError) # SystemCallError are Errno errors, we can get them when the url no longer resolves. return true if e.is_a?(SystemCallError) # Socket::ResolutionError is an error but I guess it's defined in C and we can't raise it in tests. # Anything with an error_code assume is some transport-level issue and treat it as a connection issue, # not a coding issue. return true if e.respond_to?(:error_code) return false end |
.valid_period(beginval) ⇒ Object
74 75 76 |
# File 'lib/webhookdb/sync_target.rb', line 74 def self.valid_period(beginval) return beginval..self.max_period_seconds end |
.valid_period_for(org) ⇒ Object
78 79 80 |
# File 'lib/webhookdb/sync_target.rb', line 78 def self.valid_period_for(org) return self.valid_period(org.minimum_sync_seconds) end |
.validate_db_url(s) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/webhookdb/sync_target.rb', line 118 def self.validate_db_url(s) begin url = URI(s) rescue URI::InvalidURIError return "That's not a valid URL." end protocols = ["postgres", "snowflake"] unless protocols.include?(url.scheme) protostr = protocols.join(", ") # rubocop:disable Layout/LineLength msg = "The '#{url.scheme}' protocol is not supported for database sync targets. Supported protocols are: #{protostr}." # rubocop:enable Layout/LineLength return msg end return nil end |
.validate_http_url(s) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/webhookdb/sync_target.rb', line 135 def self.validate_http_url(s) begin url = URI(s) rescue URI::InvalidURIError return "That's not a valid URL." end case url.scheme when "https" return nil if url.user.present? || url.password.present? url.user = "user" url.password = "pass" return "https urls must include a Basic Auth username and/or password, like '#{url}'" when "http" # http does not require a username/pass since it's only for internal use. return Webhookdb::SyncTarget.allow_http ? nil : "Url must be https, not http." else return "Must be an https url." end end |
.verify_db_connection(url) ⇒ Object
155 156 157 158 159 160 161 162 163 164 |
# File 'lib/webhookdb/sync_target.rb', line 155 def self.verify_db_connection(url) adapter = Webhookdb::DBAdapter.adapter(url) begin adapter.verify_connection(url, timeout: DB_VERIFY_TIMEOUT, statement: DB_VERIFY_STATEMENT) rescue StandardError => e # noinspection RailsParamDefResolve msg = e.try(:wrapped_exception).try(:to_s) || e.to_s raise InvalidConnection, "Could not SELECT 1: #{msg.strip}" end end |
.verify_http_connection(url) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/webhookdb/sync_target.rb', line 166 def self.verify_http_connection(url) cleanurl, authparams = Webhookdb::Http.extract_url_auth(url) body = { rows: [], integration_id: "svi_test", integration_service: "httpsync_test", table: "test", } begin Webhookdb::Http.post( cleanurl, body, logger: self.logger, basic_auth: authparams, timeout: HTTP_VERIFY_TIMEOUT, follow_redirects: true, ) rescue StandardError => e raise InvalidConnection, "POST to #{cleanurl} failed: #{e.}" if e.is_a?(Webhookdb::Http::Error) || self.transport_error?(e) raise end end |
Instance Method Details
#add_sync_stat(start, exception: nil, response_status: nil) ⇒ Object
:section: Stats
335 336 337 338 339 340 341 342 343 |
# File 'lib/webhookdb/sync_target.rb', line 335 def add_sync_stat(start, exception: nil, response_status: nil) stat = {"t" => s2ms(start), "d" => s2ms(Time.now - start)} stat["e"] = exception.class.name if exception stat["rs"] = response_status unless response_status.nil? stats = self.sync_stats stats.prepend(stat) stats.pop if stats.size > MAX_STATS self.will_change_column(:sync_stats) end |
#advisory_lock(db) ⇒ Sequel::AdvisoryLock
292 293 294 |
# File 'lib/webhookdb/sync_target.rb', line 292 def advisory_lock(db) return Sequel::AdvisoryLock.new(db, ADVISORY_LOCK_KEYSPACE, self.id) end |
#associated_id ⇒ String
317 318 319 320 |
# File 'lib/webhookdb/sync_target.rb', line 317 def associated_id # Eventually we need to support orgs return self.service_integration.opaque_id end |
#associated_object_display ⇒ Object
322 323 324 |
# File 'lib/webhookdb/sync_target.rb', line 322 def associated_object_display return "#{self.service_integration.opaque_id}/#{self.service_integration.table_name}" end |
#associated_type ⇒ String
311 312 313 314 |
# File 'lib/webhookdb/sync_target.rb', line 311 def associated_type # Eventually we need to support orgs return "service_integration" end |
#before_create ⇒ Object
376 377 378 |
# File 'lib/webhookdb/sync_target.rb', line 376 def before_create self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("syt") end |
#before_validation ⇒ Object
371 372 373 374 |
# File 'lib/webhookdb/sync_target.rb', line 371 def before_validation self.page_size ||= Webhookdb::SyncTarget.default_page_size super end |
#db? ⇒ Boolean
114 115 116 |
# File 'lib/webhookdb/sync_target.rb', line 114 def db? return !self.http? end |
#displaysafe_connection_url ⇒ Object
296 297 298 |
# File 'lib/webhookdb/sync_target.rb', line 296 def displaysafe_connection_url return displaysafe_url(self.connection_url) end |
#http? ⇒ Boolean
108 109 110 111 112 |
# File 'lib/webhookdb/sync_target.rb', line 108 def http? url = URI(self.connection_url) return true if ["http", "https"].include?(url.scheme) return false end |
#jitter ⇒ Object
Return the jitter used for enqueing the next sync of the job. It should never be more than 20 seconds, nor should it be more than 1/4 of the total period, since it needs to run at a reasonably predictable time. Jitter is always >= 1, since it is helpful to be able to assert it will always be in the future.
225 226 227 228 229 |
# File 'lib/webhookdb/sync_target.rb', line 225 def jitter max_jitter = [20, self.period_seconds / 4].min max_jitter = [1, max_jitter].max return RAND.rand(1..max_jitter) end |
#latency(now: Time.now) ⇒ ActiveSupport::Duration, Integer
232 233 234 235 236 |
# File 'lib/webhookdb/sync_target.rb', line 232 def latency(now: Time.now) return 0 if self.last_synced_at.nil? return 0 if self.last_synced_at > now return now - self.last_synced_at end |
#log_tags ⇒ Object
300 301 302 303 304 305 306 307 308 |
# File 'lib/webhookdb/sync_target.rb', line 300 def return { sync_target_id: self.id, sync_target_connection_url: self.displaysafe_connection_url, service_integration_id: self.service_integration_id, service_integration_service: self.service_integration.service_name, service_integration_table: self.service_integration.table_name, } end |
#next_possible_sync(now:) ⇒ Object
210 211 212 |
# File 'lib/webhookdb/sync_target.rb', line 210 def next_possible_sync(now:) return self.next_sync(self.organization.minimum_sync_seconds, now) end |
#next_scheduled_sync(now:) ⇒ Object
206 207 208 |
# File 'lib/webhookdb/sync_target.rb', line 206 def next_scheduled_sync(now:) return self.next_sync(self.period_seconds, now) end |
#organization ⇒ Webhookdb::Organization
367 368 369 |
# File 'lib/webhookdb/sync_target.rb', line 367 def organization return self.service_integration.organization end |
#run_sync(now:) ⇒ Object
Running a sync involves some work we always do (export, transform), and then work that varies per-adapter (load).
First, we lock using an advisory lock to make sure we never sync the same sync target concurrently. It can cause correctness and performance issues. Raise a SyncInProgress
error if we’re currently syncing.
If the sync target is against an HTTP URL, see _run_http_sync
.
If the sync target is a database connection:
-
Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.
-
Select rows created/updated since our last update in our ‘source’ database.
-
Write them to disk into a CSV file.
-
Pass this CSV file to the proper sync target adapter.
-
For example, the PG sync target will:
-
Create a temp table in the target database, using the schema from the sync target table.
-
Load the data into that temp table.
-
Insert rows into the target table temp table rows that do not appear in the target table.
-
Update rows in the target table temp table rows that already appear in the target table.
-
-
The snowflake sync target will:
-
PUT the CSV file into the stage for the table.
-
Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.
-
Purge the staged file.
-
will be synced.
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/webhookdb/sync_target.rb', line 268 def run_sync(now:) ran = false # Take the advisory lock with a separate connection. This seems to be pretty important- # it's possible that (for reasons not clear at this time) using the standard connection pool # results in the lock being held since the session remains open for a while on the worker. # Opening a separate connection ensures that, once this method exits, the lock will be released # since the session will be ended. Webhookdb::Dbutil.borrow_conn(Webhookdb::Postgres::Model.uri) do |db| self.advisory_lock(db).with_lock? do self.logger.info "starting_sync" routine = if self.connection_url.start_with?("https://", "http://") # Note that http links are not secure and should only be used for development purposes HttpRoutine.new(now, self) else DatabaseRoutine.new(now, self) end routine.run ran = true end end raise SyncInProgress, "SyncTarget[#{self.id}] is already being synced" unless ran end |
#schema_and_table_string ⇒ String
327 328 329 330 331 |
# File 'lib/webhookdb/sync_target.rb', line 327 def schema_and_table_string schema_name = self.schema.present? ? self.schema : self.class.default_schema table_name = self.table.present? ? self.table : self.service_integration.table_name return "#{schema_name}.#{table_name}" end |
#sync_stat_summary ⇒ Object
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 |
# File 'lib/webhookdb/sync_target.rb', line 348 def sync_stat_summary return {} if self.sync_stats.empty? earliest = self.sync_stats.last latest = self.sync_stats.first average_latency = (self.sync_stats.sum { |st| ms2s(st["d"]) }) / self.sync_stats.size errors = self.sync_stats.count { |st| st["e"] || st["rs"] } calls_per_minute = 60 / average_latency rpm = self.page_size * calls_per_minute rpm *= self.parallelism if self.parallelism.positive? return { latest: Time.at(ms2s(latest["t"]).to_i), earliest: Time.at(ms2s(earliest["t"]).to_i), average_latency: average_latency.round(2), average_rows_minute: rpm.to_i, errors:, } end |