Class: Webhookdb::SyncTarget

Inherits:
Object
  • Object
show all
Includes:
Appydays::Configurable, Dbutil
Defined in:
lib/webhookdb/sync_target.rb

Overview

Support exporting WebhookDB data into external services, such as another Postgres instance or data warehouse (Snowflake, etc).

At a high level, the way sync targets work are:

  • User uses the CLI to register a sync target for a specific integration using a database connection string for a supported db (ie, postgres://).

  • They include a period (how often it is synced), and an optional schema and table (if not used, we’ll use the default schema, and the service integration table name).

  • Every minute or so, we look for sync targets that are “due” and enqueue a sync for them. Customers can enqueue their own sync request; but it cannot run more than the minimum allowed sync time.

For the sync logic itself, see run_sync.

Defined Under Namespace

Classes: DatabaseRoutine, Deleted, HttpRoutine, InvalidConnection, Routine, SyncInProgress

Constant Summary collapse

ADVISORY_LOCK_KEYSPACE =

Advisory locks for sync targets use this as the first int, and the id as the second.

2_000_000_000
HTTP_VERIFY_TIMEOUT =
3
DB_VERIFY_TIMEOUT =
2000
DB_VERIFY_STATEMENT =
"SELECT 1"
RAND =
Random.new
MAX_STATS =
200

Constants included from Dbutil

Dbutil::MOCK_CONN

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Dbutil

borrow_conn, configured_connection_options, conn_opts, displaysafe_url, reduce_expr, take_conn

Instance Attribute Details

#connection_urlString

Returns:

  • (String)


# File 'lib/webhookdb/sync_target.rb', line 383

#service_integrationWebhookdb::ServiceIntegration



# File 'lib/webhookdb/sync_target.rb', line 380

Class Method Details

.default_valid_periodObject



82
83
84
# File 'lib/webhookdb/sync_target.rb', line 82

def self.default_valid_period
  return self.valid_period(Webhookdb::SyncTarget.default_min_period_seconds)
end

.transport_error?(e) ⇒ Boolean

Return true if the given error is considered a ‘transport’ error, like a timeout, socket error, dns error, etc. This isn’t a consistent class type.

Returns:

  • (Boolean)


193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/webhookdb/sync_target.rb', line 193

def self.transport_error?(e)
  return true if e.is_a?(Timeout::Error)
  return true if e.is_a?(SocketError)
  return true if e.is_a?(OpenSSL::SSL::SSLError)
  # SystemCallError are Errno errors, we can get them when the url no longer resolves.
  return true if e.is_a?(SystemCallError)
  # Socket::ResolutionError is an error but I guess it's defined in C and we can't raise it in tests.
  # Anything with an error_code assume is some transport-level issue and treat it as a connection issue,
  # not a coding issue.
  return true if e.respond_to?(:error_code)
  return false
end

.valid_period(beginval) ⇒ Object



74
75
76
# File 'lib/webhookdb/sync_target.rb', line 74

def self.valid_period(beginval)
  return beginval..self.max_period_seconds
end

.valid_period_for(org) ⇒ Object



78
79
80
# File 'lib/webhookdb/sync_target.rb', line 78

def self.valid_period_for(org)
  return self.valid_period(org.minimum_sync_seconds)
end

.validate_db_url(s) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/webhookdb/sync_target.rb', line 118

def self.validate_db_url(s)
  begin
    url = URI(s)
  rescue URI::InvalidURIError
    return "That's not a valid URL."
  end
  protocols = ["postgres", "snowflake"]
  unless protocols.include?(url.scheme)
    protostr = protocols.join(", ")
    # rubocop:disable Layout/LineLength
    msg = "The '#{url.scheme}' protocol is not supported for database sync targets. Supported protocols are: #{protostr}."
    # rubocop:enable Layout/LineLength
    return msg
  end
  return nil
end

.validate_http_url(s) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/webhookdb/sync_target.rb', line 135

def self.validate_http_url(s)
  begin
    url = URI(s)
  rescue URI::InvalidURIError
    return "That's not a valid URL."
  end
  case url.scheme
    when "https"
      return nil if url.user.present? || url.password.present?
      url.user = "user"
      url.password = "pass"
      return "https urls must include a Basic Auth username and/or password, like '#{url}'"
    when "http"
      # http does not require a username/pass since it's only for internal use.
      return Webhookdb::SyncTarget.allow_http ? nil : "Url must be https, not http."
    else
      return "Must be an https url."
  end
end

.verify_db_connection(url) ⇒ Object



155
156
157
158
159
160
161
162
163
164
# File 'lib/webhookdb/sync_target.rb', line 155

def self.verify_db_connection(url)
  adapter = Webhookdb::DBAdapter.adapter(url)
  begin
    adapter.verify_connection(url, timeout: DB_VERIFY_TIMEOUT, statement: DB_VERIFY_STATEMENT)
  rescue StandardError => e
    # noinspection RailsParamDefResolve
    msg = e.try(:wrapped_exception).try(:to_s) || e.to_s
    raise InvalidConnection, "Could not SELECT 1: #{msg.strip}"
  end
end

.verify_http_connection(url) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/webhookdb/sync_target.rb', line 166

def self.verify_http_connection(url)
  cleanurl, authparams = Webhookdb::Http.extract_url_auth(url)
  body = {
    rows: [],
    integration_id: "svi_test",
    integration_service: "httpsync_test",
    table: "test",
  }
  begin
    Webhookdb::Http.post(
      cleanurl,
      body,
      logger: self.logger,
      basic_auth: authparams,
      timeout: HTTP_VERIFY_TIMEOUT,
      follow_redirects: true,
    )
  rescue StandardError => e
    raise InvalidConnection, "POST to #{cleanurl} failed: #{e.message}" if
      e.is_a?(Webhookdb::Http::Error) || self.transport_error?(e)
    raise
  end
end

Instance Method Details

#add_sync_stat(start, exception: nil, response_status: nil) ⇒ Object

:section: Stats



335
336
337
338
339
340
341
342
343
# File 'lib/webhookdb/sync_target.rb', line 335

def add_sync_stat(start, exception: nil, response_status: nil)
  stat = {"t" => s2ms(start), "d" => s2ms(Time.now - start)}
  stat["e"] = exception.class.name if exception
  stat["rs"] = response_status unless response_status.nil?
  stats = self.sync_stats
  stats.prepend(stat)
  stats.pop if stats.size > MAX_STATS
  self.will_change_column(:sync_stats)
end

#advisory_lock(db) ⇒ Sequel::AdvisoryLock



292
293
294
# File 'lib/webhookdb/sync_target.rb', line 292

def advisory_lock(db)
  return Sequel::AdvisoryLock.new(db, ADVISORY_LOCK_KEYSPACE, self.id)
end

#associated_idString

Returns:

  • (String)


317
318
319
320
# File 'lib/webhookdb/sync_target.rb', line 317

def associated_id
  # Eventually we need to support orgs
  return self.service_integration.opaque_id
end

#associated_object_displayObject



322
323
324
# File 'lib/webhookdb/sync_target.rb', line 322

def associated_object_display
  return "#{self.service_integration.opaque_id}/#{self.service_integration.table_name}"
end

#associated_typeString

Returns:

  • (String)


311
312
313
314
# File 'lib/webhookdb/sync_target.rb', line 311

def associated_type
  # Eventually we need to support orgs
  return "service_integration"
end

#before_createObject



376
377
378
# File 'lib/webhookdb/sync_target.rb', line 376

def before_create
  self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("syt")
end

#before_validationObject



371
372
373
374
# File 'lib/webhookdb/sync_target.rb', line 371

def before_validation
  self.page_size ||= Webhookdb::SyncTarget.default_page_size
  super
end

#db?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/webhookdb/sync_target.rb', line 114

def db?
  return !self.http?
end

#displaysafe_connection_urlObject



296
297
298
# File 'lib/webhookdb/sync_target.rb', line 296

def displaysafe_connection_url
  return displaysafe_url(self.connection_url)
end

#http?Boolean

Returns:

  • (Boolean)


108
109
110
111
112
# File 'lib/webhookdb/sync_target.rb', line 108

def http?
  url = URI(self.connection_url)
  return true if ["http", "https"].include?(url.scheme)
  return false
end

#jitterObject

Return the jitter used for enqueing the next sync of the job. It should never be more than 20 seconds, nor should it be more than 1/4 of the total period, since it needs to run at a reasonably predictable time. Jitter is always >= 1, since it is helpful to be able to assert it will always be in the future.



225
226
227
228
229
# File 'lib/webhookdb/sync_target.rb', line 225

def jitter
  max_jitter = [20, self.period_seconds / 4].min
  max_jitter = [1, max_jitter].max
  return RAND.rand(1..max_jitter)
end

#latency(now: Time.now) ⇒ ActiveSupport::Duration, Integer

Returns:

  • (ActiveSupport::Duration, Integer)


232
233
234
235
236
# File 'lib/webhookdb/sync_target.rb', line 232

def latency(now: Time.now)
  return 0 if self.last_synced_at.nil?
  return 0 if self.last_synced_at > now
  return now - self.last_synced_at
end

#log_tagsObject



300
301
302
303
304
305
306
307
308
# File 'lib/webhookdb/sync_target.rb', line 300

def log_tags
  return {
    sync_target_id: self.id,
    sync_target_connection_url: self.displaysafe_connection_url,
    service_integration_id: self.service_integration_id,
    service_integration_service: self.service_integration.service_name,
    service_integration_table: self.service_integration.table_name,
  }
end

#next_possible_sync(now:) ⇒ Object



210
211
212
# File 'lib/webhookdb/sync_target.rb', line 210

def next_possible_sync(now:)
  return self.next_sync(self.organization.minimum_sync_seconds, now)
end

#next_scheduled_sync(now:) ⇒ Object



206
207
208
# File 'lib/webhookdb/sync_target.rb', line 206

def next_scheduled_sync(now:)
  return self.next_sync(self.period_seconds, now)
end

#organizationWebhookdb::Organization



367
368
369
# File 'lib/webhookdb/sync_target.rb', line 367

def organization
  return self.service_integration.organization
end

#run_sync(now:) ⇒ Object

Running a sync involves some work we always do (export, transform), and then work that varies per-adapter (load).

First, we lock using an advisory lock to make sure we never sync the same sync target concurrently. It can cause correctness and performance issues. Raise a SyncInProgress error if we’re currently syncing.

If the sync target is against an HTTP URL, see _run_http_sync.

If the sync target is a database connection:

  • Ensure the sync target table exists and has the right schema. In general we do NOT create indices for the target table; since this table is for a client’s data warehouse, we assume they will optimize it as needed. The only exception is the unique constraint for the remote key column.

  • Select rows created/updated since our last update in our ‘source’ database.

  • Write them to disk into a CSV file.

  • Pass this CSV file to the proper sync target adapter.

  • For example, the PG sync target will:

    • Create a temp table in the target database, using the schema from the sync target table.

    • Load the data into that temp table.

    • Insert rows into the target table temp table rows that do not appear in the target table.

    • Update rows in the target table temp table rows that already appear in the target table.

  • The snowflake sync target will:

    • PUT the CSV file into the stage for the table.

    • Otherwise the logic is the same as PG: create a temp table and COPY INTO from the CSV.

    • Purge the staged file.

will be synced.

Parameters:

  • now (Time)

    The current time. Rows that were updated <= to ‘now’, and >= the ‘last updated’ timestamp,

Raises:



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/webhookdb/sync_target.rb', line 268

def run_sync(now:)
  ran = false
  # Take the advisory lock with a separate connection. This seems to be pretty important-
  # it's possible that (for reasons not clear at this time) using the standard connection pool
  # results in the lock being held since the session remains open for a while on the worker.
  # Opening a separate connection ensures that, once this method exits, the lock will be released
  # since the session will be ended.
  Webhookdb::Dbutil.borrow_conn(Webhookdb::Postgres::Model.uri) do |db|
    self.advisory_lock(db).with_lock? do
      self.logger.info "starting_sync"
      routine = if self.connection_url.start_with?("https://", "http://")
                  # Note that http links are not secure and should only be used for development purposes
                  HttpRoutine.new(now, self)
      else
        DatabaseRoutine.new(now, self)
      end
      routine.run
      ran = true
    end
  end
  raise SyncInProgress, "SyncTarget[#{self.id}] is already being synced" unless ran
end

#schema_and_table_stringString

Returns:

  • (String)


327
328
329
330
331
# File 'lib/webhookdb/sync_target.rb', line 327

def schema_and_table_string
  schema_name = self.schema.present? ? self.schema : self.class.default_schema
  table_name = self.table.present? ? self.table : self.service_integration.table_name
  return "#{schema_name}.#{table_name}"
end

#sync_stat_summaryObject



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/webhookdb/sync_target.rb', line 348

def sync_stat_summary
  return {} if self.sync_stats.empty?
  earliest = self.sync_stats.last
  latest = self.sync_stats.first
  average_latency = (self.sync_stats.sum { |st| ms2s(st["d"]) }) / self.sync_stats.size
  errors = self.sync_stats.count { |st| st["e"] || st["rs"] }
  calls_per_minute = 60 / average_latency
  rpm = self.page_size * calls_per_minute
  rpm *= self.parallelism if self.parallelism.positive?
  return {
    latest: Time.at(ms2s(latest["t"]).to_i),
    earliest: Time.at(ms2s(earliest["t"]).to_i),
    average_latency: average_latency.round(2),
    average_rows_minute: rpm.to_i,
    errors:,
  }
end