Class: Webhookdb::Organization

Inherits:
Object
  • Object
show all
Includes:
Admin::Linked
Defined in:
lib/webhookdb/organization.rb

Defined Under Namespace

Classes: Alerting, DatabaseMigration, DbBuilder, ErrorHandler, QueryResult, SchemaMigrationError

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Admin::Linked

#admin_link

Class Method Details

.create_if_unique(params) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/webhookdb/organization.rb', line 105

def self.create_if_unique(params)
  self.db.transaction(savepoint: true) do
    return Webhookdb::Organization.create(name: params[:name])
  end
rescue Sequel::UniqueConstraintViolation
  return nil
end

.enqueue_migrate_all_replication_tablesObject

As part of the release process, we enqueue a job that will migrate the replication schemas for all organizations. However this job must use the NEW code being released; it should not use the CURRENT code the workers may be using when this method is run during the release process.

We can get around this by enqueing the jobs with the ‘target’ release creation date. Only jobs that execute with this release creation date will perform the migration; if the job is running using an older release creation date (ie still running old code), it will re-enqueue the migration to run in the future, using a worker that will eventually be using newer code.

For example:

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • The workers, using Release A code (with a release_created_at of 0), run the ReplicationMigration job. They see the target release_created_at of 1 is greater than/after the current release_created_at of 0, so re-enqueue the job.

  • Eventually the workers are using Release B code, which has a release_created_at of 1. This matches the target, so the job is run.

For a more complex example, which involves releases created in quick succession (we need to be careful to avoid jobs that never run):

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • Release C, created at 2, runs this method.

  • Workers are backed up, so nothing is processed until all workers are using Release C.

  • Workers using Release C code process two sets of jobs:

    • Jobs with a target release_created_at of 1

    • Jobs with a target release_created_at of 2

  • Jobs with a target of 2 run the actual migration, because the times match.

  • Jobs with a target of 1, see that the target is less than/before current release_created_at of 2. This indicates the migration is stale, and the job is discarded.

NOTE: There will always be a race condition where we may process webhooks using the new code, before we’ve migrated the replication schemas into the new code. This will error during the upsert because the column doesn’t yet exist. However these will be retried automatically, and quickly, so we don’t worry about them yet.



328
329
330
331
332
# File 'lib/webhookdb/organization.rb', line 328

def self.enqueue_migrate_all_replication_tables
  Webhookdb::Organization.each do |org|
    Webhookdb::Jobs::ReplicationMigration.perform_in(2, org.id, Webhookdb::RELEASE_CREATED_AT)
  end
end

Instance Method Details

#active_subscription?Boolean

SUBSCRIPTION PERMISSIONS

Returns:

  • (Boolean)


474
475
476
477
478
479
480
# File 'lib/webhookdb/organization.rb', line 474

def active_subscription?
  subscription = Webhookdb::Subscription[stripe_customer_id: self.stripe_customer_id]
  # return false if no subscription
  return false if subscription.nil?
  # otherwise check stripe subscription string
  return ["trialing", "active", "past due"].include? subscription.status
end

#add_membership(opts = {}) ⇒ Object

:section: Memberships



453
454
455
456
457
458
459
# File 'lib/webhookdb/organization.rb', line 453

def add_membership(opts={})
  if !opts.is_a?(Webhookdb::OrganizationMembership) && !opts.key?(:verified)
    raise ArgumentError, "must pass :verified or a model into add_membership, it is ambiguous otherwise"
  end
  self.associations.delete(opts[:verified] ? :verified_memberships : :invited_memberships)
  return self.add_all_membership(opts)
end

#admin_connection(**kw) ⇒ Object



129
130
131
# File 'lib/webhookdb/organization.rb', line 129

def admin_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.admin_connection_url_raw, **kw, &)
end

#admin_connection_urlObject

Return the admin connection url, with the host set to public_host if set.



190
191
192
# File 'lib/webhookdb/organization.rb', line 190

def admin_connection_url
  return self._public_host_connection_url(self.admin_connection_url_raw)
end

#admin_customersObject



113
114
115
# File 'lib/webhookdb/organization.rb', line 113

def admin_customers
  return self.verified_memberships.filter(&:admin?).map(&:customer)
end

#admin_userObject



215
216
217
218
# File 'lib/webhookdb/organization.rb', line 215

def admin_user
  ur = URI(self.admin_connection_url)
  return ur.user
end

#alertingObject



117
118
119
# File 'lib/webhookdb/organization.rb', line 117

def alerting
  return @alerting ||= Alerting.new(self)
end

#available_replicatorsObject



490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/webhookdb/organization.rb', line 490

def available_replicators
  available = Webhookdb::Replicator.registry.values.filter do |desc|
    # The org must have any of the flags required for the service. In other words,
    # the intersection of desc[:feature_roles] & org.feature_roles must
    # not be empty
    no_restrictions = desc.feature_roles.empty?
    next true if no_restrictions
    org_has_access = (self.feature_roles.map(&:name) & desc.feature_roles).present?
    org_has_access
  end
  return available
end

#before_validationObject



98
99
100
101
102
103
# File 'lib/webhookdb/organization.rb', line 98

def before_validation
  self.minimum_sync_seconds ||= Webhookdb::SyncTarget.default_min_period_seconds
  self.key ||= Webhookdb.to_slug(self.name)
  self.replication_schema ||= Webhookdb::Organization::DbBuilder.new(self).default_replication_schema
  super
end

#can_add_new_integration?Boolean

Returns:

  • (Boolean)


482
483
484
485
486
487
488
# File 'lib/webhookdb/organization.rb', line 482

def can_add_new_integration?
  # if the sint's organization has an active subscription, return true
  return true if self.active_subscription?
  # if there is no active subscription, check number of integrations against free tier max
  limit = Webhookdb::Subscription.max_free_integrations
  return Webhookdb::ServiceIntegration.where(organization: self).count < limit
end

#cli_editable_fieldsObject



121
122
123
# File 'lib/webhookdb/organization.rb', line 121

def cli_editable_fields
  return ["name", "billing_email"]
end

#close(confirm:) ⇒ Object



461
462
463
464
465
466
467
468
469
470
# File 'lib/webhookdb/organization.rb', line 461

def close(confirm:)
  raise Webhookdb::InvalidPrecondition, "confirm must be true to close the org" unless confirm
  unless self.service_integrations_dataset.empty?
    msg = "Organization[#{self.key} cannot close with active service integrations"
    raise Webhookdb::InvalidPrecondition, msg
  end
  memberships = self.all_memberships_dataset.all.each(&:destroy)
  self.destroy
  return [self, memberships]
end

#create_public_host_cname(safe: false) ⇒ Object

Create a CNAME in Cloudflare for the currently configured connection urls.

Parameters:

  • safe (*) (defaults to: false)

    If true, noop if the public host is set.



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/webhookdb/organization.rb', line 259

def create_public_host_cname(safe: false)
  self.db.transaction do
    self.lock!
    # We must have a host to create a CNAME to.
    raise Webhookdb::InvalidPrecondition, "connection urls must be set" if self.readonly_connection_url_raw.blank?
    # Should only be used once when creating the org DBs.
    if self.public_host.present?
      return if safe
      raise Webhookdb::InvalidPrecondition, "public_host must not be set"
    end
    # Use the raw URL, even though we know at this point
    # public_host is empty so raw and public host urls are the same.
    self.db_builder.create_public_host_cname(self.readonly_connection_url_raw)
    self.save_changes
  end
end

#db_builderWebhookdb::Organization::DbBuilder



236
# File 'lib/webhookdb/organization.rb', line 236

def db_builder = Webhookdb::Organization::DbBuilder.new(self)

#dbnameObject



209
210
211
212
213
# File 'lib/webhookdb/organization.rb', line 209

def dbname
  raise Webhookdb::InvalidPrecondition, "no db has been created, call prepare_database_connections first" if
    self.admin_connection_url.blank?
  return URI(self.admin_connection_url).path.tr("/", "")
end

#display_stringObject



231
232
233
# File 'lib/webhookdb/organization.rb', line 231

def display_string
  return "#{self.name} (#{self.key})"
end

#execute_readonly_query(sql) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/webhookdb/organization.rb', line 133

def execute_readonly_query(sql)
  max_rows = self.max_query_rows || self.class.max_query_rows
  return self.readonly_connection do |conn|
    ds = conn.fetch(sql)
    r = QueryResult.new
    r.max_rows_reached = false
    r.columns = ds.columns
    r.rows = []
    # Stream to avoid pulling in all rows of unlimited queries
    ds.stream.each do |row|
      if r.rows.length >= max_rows
        r.max_rows_reached = true
        break
      end
      r.rows << row.values
    end
    return r
  end
end

#execute_readonly_query_with_help(sql) ⇒ Array<Webhookdb::Organization::QueryResult,String,nil>

Run the given SQL inside the org, and use special error handling if it fails.

Returns:

  • (Array<Webhookdb::Organization::QueryResult,String,nil>)

    Tuple of query result, and optional message. On query success, return <QueryResult, nil>. On DatabaseError, return <nil, message>. On other types of errors, raise.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/webhookdb/organization.rb', line 158

def execute_readonly_query_with_help(sql)
  result = self.execute_readonly_query(sql)
  return result, nil
rescue Sequel::DatabaseError => e
  self.logger.error("db_query_database_error", e)
  # We want to handle InsufficientPrivileges and UndefinedTable explicitly
  # since we can hint the user at what to do.
  # Otherwise, we should just return the Postgres exception.
  msg = ""
  case e.wrapped_exception
    when PG::UndefinedTable
      missing_table = e.wrapped_exception.message.match(/relation (.+) does not/)&.captures&.first
      msg = "The table #{missing_table} does not exist. Run `webhookdb db tables` to see available tables." if
        missing_table
    when PG::InsufficientPrivilege
      msg = "You do not have permission to perform this query. Queries must be read-only."
    else
      msg = e.wrapped_exception.message
  end
  return [nil, msg]
end

#get_stripe_billing_portal_urlObject



418
419
420
421
422
423
424
425
426
427
428
# File 'lib/webhookdb/organization.rb', line 418

def get_stripe_billing_portal_url
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::BillingPortal::Session.create(
    {
      customer: self.stripe_customer_id,
      return_url: Webhookdb.app_url + "/jump/portal-return",
    },
  )

  return session.url
end

#get_stripe_checkout_url(price_id) ⇒ Object



430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/webhookdb/organization.rb', line 430

def get_stripe_checkout_url(price_id)
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::Checkout::Session.create(
    {
      customer: self.stripe_customer_id,
      cancel_url: Webhookdb.app_url + "/jump/checkout-cancel",
      line_items: [{
        price: price_id, quantity: 1,
      }],
      mode: "subscription",
      payment_method_types: ["card"],
      allow_promotion_codes: true,
      success_url: Webhookdb.app_url + "/jump/checkout-success",
    },
  )

  return session.url
end

#log_tagsObject



91
92
93
94
95
96
# File 'lib/webhookdb/organization.rb', line 91

def log_tags
  return {
    organization_id: self.id,
    organization_key: self.key,
  }
end

#migrate_replication_schema(schema) ⇒ Object



390
391
392
393
394
395
396
397
398
399
400
# File 'lib/webhookdb/organization.rb', line 390

def migrate_replication_schema(schema)
  Webhookdb::DBAdapter.validate_identifier!(schema, type: "schema")
  Webhookdb::Organization::DatabaseMigration.guard_ongoing!(self)
  raise SchemaMigrationError, "destination and target schema are the same" if schema == self.replication_schema
  builder = self.db_builder
  sql = builder.migration_replication_schema_sql(self.replication_schema, schema)
  self.admin_connection(transaction: true) do |db|
    db << sql
  end
  self.update(replication_schema: schema)
end

#migrate_replication_tablesObject

Get all the table names and column names for all integrations in the org Find any of those table/column pairs that are not present in information_schema.columns Ensure all columns for those integrations/tables.



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/webhookdb/organization.rb', line 337

def migrate_replication_tables
  tables = self.service_integrations.map(&:table_name)
  sequences_in_app_db = self.db[Sequel[:information_schema][:sequences]].
    grep(:sequence_name, "replicator_seq_org_#{self.id}_%").
    select_map(:sequence_name).
    to_set
  cols_in_org_db = {}
  indices_in_org_db = Set.new
  self.admin_connection do |db|
    cols_in_org_db = db[Sequel[:information_schema][:columns]].
      where(table_schema: self.replication_schema, table_name: tables).
      select(
        :table_name,
        Sequel.function(:array_agg, :column_name).cast("text[]").as(:columns),
      ).
      group_by(:table_name).
      all.
      to_h { |c| [c[:table_name], c[:columns]] }
    indices_in_org_db = db[Sequel[:pg_indexes]].
      where(schemaname: self.replication_schema, tablename: tables).
      select_map(:indexname).
      to_set
  end

  self.service_integrations.each do |sint|
    svc = sint.replicator
    existing_columns = cols_in_org_db.fetch(sint.table_name) { [] }
    cols_for_sint = svc.storable_columns.map { |c| c.name.to_s }
    all_sint_cols_exist = (cols_for_sint - existing_columns).empty?

    all_indices_exist = svc.indices(svc.dbadapter_table).all? do |ind|
      indices_in_org_db.include?(ind.name.to_s)
    end

    svc.ensure_all_columns unless all_sint_cols_exist && all_indices_exist
    if svc.requires_sequence? && !sequences_in_app_db.include?(sint.sequence_name)
      sint.ensure_sequence(skip_check: true)
    end
  end
end

#prepare_database_connections(safe: false) ⇒ Object

Build the org-specific users, database, and set our connection URLs to it.

Parameters:

  • safe (*) (defaults to: false)

    If true, noop if connection urls are set.



242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/webhookdb/organization.rb', line 242

def prepare_database_connections(safe: false)
  self.db.transaction do
    self.lock!
    if self.admin_connection_url.present?
      return if safe
      raise Webhookdb::InvalidPrecondition, "connections already set"
    end
    builder = self.db_builder
    builder.prepare_database_connections
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#prepare_database_connections?Boolean

Returns:

  • (Boolean)


238
# File 'lib/webhookdb/organization.rb', line 238

def prepare_database_connections? = self.prepare_database_connections(safe: true)

#readonly_connection(**kw) ⇒ Object



125
126
127
# File 'lib/webhookdb/organization.rb', line 125

def readonly_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.readonly_connection_url_raw, **kw, &)
end

#readonly_connection_urlObject

Return the readonly connection url, with the host set to public_host if set.



185
186
187
# File 'lib/webhookdb/organization.rb', line 185

def readonly_connection_url
  return self._public_host_connection_url(self.readonly_connection_url_raw)
end

#readonly_userObject



220
221
222
223
# File 'lib/webhookdb/organization.rb', line 220

def readonly_user
  ur = URI(self.readonly_connection_url)
  return ur.user
end

#register_in_stripeObject



402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/webhookdb/organization.rb', line 402

def register_in_stripe
  raise Webhookdb::InvalidPrecondition, "org already in Stripe" if self.stripe_customer_id.present?
  stripe_customer = Stripe::Customer.create(
    {
      name: self.name,
      email: self.billing_email,
      metadata: {
        org_id: self.id,
      },
    },
  )
  self.stripe_customer_id = stripe_customer.id
  self.save_changes
  return stripe_customer
end

Delete the org-specific database and remove the org connection strings. Use this when an org is to be deleted (either for real, or in test teardown).



278
279
280
281
282
283
284
285
286
# File 'lib/webhookdb/organization.rb', line 278

def remove_related_database
  self.db.transaction do
    self.lock!
    self.db_builder.remove_related_database
    self.admin_connection_url_raw = ""
    self.readonly_connection_url_raw = ""
    self.save_changes
  end
end

#roll_database_credentialsObject

Modify the admin and readonly users to have new usernames and passwords.



379
380
381
382
383
384
385
386
387
388
# File 'lib/webhookdb/organization.rb', line 379

def roll_database_credentials
  self.db.transaction do
    self.lock!
    builder = self.db_builder
    builder.roll_connection_credentials
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#single_db_user?Boolean

In cases where the readonly and admin user are the same, we sometimes adapt queries to prevent revoking admin db priviliges.

Returns:

  • (Boolean)


227
228
229
# File 'lib/webhookdb/organization.rb', line 227

def single_db_user?
  return self.admin_user == self.readonly_user
end

#validateObject

:section: Validations



507
508
509
510
511
512
513
# File 'lib/webhookdb/organization.rb', line 507

def validate
  super
  validates_all_or_none(:admin_connection_url_raw, :readonly_connection_url_raw, predicate: :present?)
  validates_format(/^\D/, :name, message: "can't begin with a digit")
  validates_format(/^[a-z][a-z0-9_]*$/, :key, message: "is not valid as a CNAME")
  validates_max_length 63, :key, message: "is not valid as a CNAME"
end