Class: Webhookdb::Organization

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/organization.rb

Defined Under Namespace

Classes: Alerting, DatabaseMigration, DbBuilder, QueryResult, SchemaMigrationError

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.create_if_unique(params) ⇒ Object



99
100
101
102
103
104
105
# File 'lib/webhookdb/organization.rb', line 99

def self.create_if_unique(params)
  self.db.transaction(savepoint: true) do
    return Webhookdb::Organization.create(name: params[:name])
  end
rescue Sequel::UniqueConstraintViolation
  return nil
end

.enqueue_migrate_all_replication_tablesObject

As part of the release process, we enqueue a job that will migrate the replication schemas for all organizations. However this job must use the NEW code being released; it should not use the CURRENT code the workers may be using when this method is run during the release process.

We can get around this by enqueing the jobs with the ‘target’ release creation date. Only jobs that execute with this release creation date will perform the migration; if the job is running using an older release creation date (ie still running old code), it will re-enqueue the migration to run in the future, using a worker that will eventually be using newer code.

For example:

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • The workers, using Release A code (with a release_created_at of 0), run the ReplicationMigration job. They see the target release_created_at of 1 is greater than/after the current release_created_at of 0, so re-enqueue the job.

  • Eventually the workers are using Release B code, which has a release_created_at of 1. This matches the target, so the job is run.

For a more complex example, which involves releases created in quick succession (we need to be careful to avoid jobs that never run):

  • We have Release A, created at 0, currently running.

  • Release B, created at 1, runs this method.

  • Release C, created at 2, runs this method.

  • Workers are backed up, so nothing is processed until all workers are using Release C.

  • Workers using Release C code process two sets of jobs:

    • Jobs with a target release_created_at of 1

    • Jobs with a target release_created_at of 2

  • Jobs with a target of 2 run the actual migration, because the times match.

  • Jobs with a target of 1, see that the target is less than/before current release_created_at of 2. This indicates the migration is stale, and the job is discarded.

NOTE: There will always be a race condition where we may process webhooks using the new code, before we’ve migrated the replication schemas into the new code. This will error during the upsert because the column doesn’t yet exist. However these will be retried automatically, and quickly, so we don’t worry about them yet.



289
290
291
292
293
# File 'lib/webhookdb/organization.rb', line 289

def self.enqueue_migrate_all_replication_tables
  Webhookdb::Organization.each do |org|
    Webhookdb::Jobs::ReplicationMigration.perform_in(2, org.id, Webhookdb::RELEASE_CREATED_AT)
  end
end

Instance Method Details

#active_subscription?Boolean

SUBSCRIPTION PERMISSIONS

Returns:

  • (Boolean)


427
428
429
430
431
432
433
# File 'lib/webhookdb/organization.rb', line 427

def active_subscription?
  subscription = Webhookdb::Subscription[stripe_customer_id: self.stripe_customer_id]
  # return false if no subscription
  return false if subscription.nil?
  # otherwise check stripe subscription string
  return ["trialing", "active", "past due"].include? subscription.status
end

#add_membership(opts = {}) ⇒ Object

:section: Memberships



417
418
419
420
421
422
423
# File 'lib/webhookdb/organization.rb', line 417

def add_membership(opts={})
  if !opts.is_a?(Webhookdb::OrganizationMembership) && !opts.key?(:verified)
    raise ArgumentError, "must pass :verified or a model into add_membership, it is ambiguous otherwise"
  end
  self.associations.delete(opts[:verified] ? :verified_memberships : :invited_memberships)
  return self.add_all_membership(opts)
end

#admin_connection(**kw) ⇒ Object



123
124
125
# File 'lib/webhookdb/organization.rb', line 123

def admin_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.admin_connection_url_raw, **kw, &)
end

#admin_connection_urlObject

Return the admin connection url, with the host set to public_host if set.



157
158
159
# File 'lib/webhookdb/organization.rb', line 157

def admin_connection_url
  return self._public_host_connection_url(self.admin_connection_url_raw)
end

#admin_customersObject



107
108
109
# File 'lib/webhookdb/organization.rb', line 107

def admin_customers
  return self.verified_memberships.filter(&:admin?).map(&:customer)
end

#admin_userObject



182
183
184
185
# File 'lib/webhookdb/organization.rb', line 182

def admin_user
  ur = URI(self.admin_connection_url)
  return ur.user
end

#alertingObject



111
112
113
# File 'lib/webhookdb/organization.rb', line 111

def alerting
  return @alerting ||= Alerting.new(self)
end

#available_replicator_namesObject



443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/webhookdb/organization.rb', line 443

def available_replicator_names
  available = Webhookdb::Replicator.registry.values.filter do |desc|
    # The org must have any of the flags required for the service. In other words,
    # the intersection of desc[:feature_roles] & org.feature_roles must
    # not be empty
    no_restrictions = desc.feature_roles.empty?
    next true if no_restrictions
    org_has_access = (self.feature_roles.map(&:name) & desc.feature_roles).present?
    org_has_access
  end
  return available.map(&:name)
end

#before_validationObject



92
93
94
95
96
97
# File 'lib/webhookdb/organization.rb', line 92

def before_validation
  self.minimum_sync_seconds ||= Webhookdb::SyncTarget.default_min_period_seconds
  self.key ||= Webhookdb.to_slug(self.name)
  self.replication_schema ||= Webhookdb::Organization::DbBuilder.new(self).default_replication_schema
  super
end

#can_add_new_integration?Boolean

Returns:

  • (Boolean)


435
436
437
438
439
440
441
# File 'lib/webhookdb/organization.rb', line 435

def can_add_new_integration?
  # if the sint's organization has an active subscription, return true
  return true if self.active_subscription?
  # if there is no active subscription, check number of integrations against free tier max
  limit = Webhookdb::Subscription.max_free_integrations
  return Webhookdb::ServiceIntegration.where(organization: self).count < limit
end

#cli_editable_fieldsObject



115
116
117
# File 'lib/webhookdb/organization.rb', line 115

def cli_editable_fields
  return ["name", "billing_email"]
end

#create_public_host_cnameObject

Create a CNAME in Cloudflare for the currently configured connection urls.



223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/webhookdb/organization.rb', line 223

def create_public_host_cname
  self.db.transaction do
    self.lock!
    # We must have a host to create a CNAME to.
    raise Webhookdb::InvalidPrecondition, "connection urls must be set" if self.readonly_connection_url_raw.blank?
    # Should only be used once when creating the org DBs.
    raise Webhookdb::InvalidPrecondition, "public_host must not be set" if self.public_host.present?
    # Use the raw URL, even though we know at this point
    # public_host is empty so raw and public host urls are the same.
    Webhookdb::Organization::DbBuilder.new(self).create_public_host_cname(self.readonly_connection_url_raw)
    self.save_changes
  end
end

#dbnameObject



176
177
178
179
180
# File 'lib/webhookdb/organization.rb', line 176

def dbname
  raise Webhookdb::InvalidPrecondition, "no db has been created, call prepare_database_connections first" if
    self.admin_connection_url.blank?
  return URI(self.admin_connection_url).path.tr("/", "")
end

#display_stringObject



198
199
200
# File 'lib/webhookdb/organization.rb', line 198

def display_string
  return "#{self.name} (#{self.key})"
end

#execute_readonly_query(sql) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/webhookdb/organization.rb', line 127

def execute_readonly_query(sql)
  max_rows = self.max_query_rows || self.class.max_query_rows
  return self.readonly_connection do |conn|
    ds = conn.fetch(sql)
    r = QueryResult.new
    r.max_rows_reached = false
    r.columns = ds.columns
    r.rows = []
    # Stream to avoid pulling in all rows of unlimited queries
    ds.stream.each do |row|
      if r.rows.length >= max_rows
        r.max_rows_reached = true
        break
      end
      r.rows << row.values
    end
    return r
  end
end

#get_stripe_billing_portal_urlObject



382
383
384
385
386
387
388
389
390
391
392
# File 'lib/webhookdb/organization.rb', line 382

def get_stripe_billing_portal_url
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::BillingPortal::Session.create(
    {
      customer: self.stripe_customer_id,
      return_url: Webhookdb.app_url + "/jump/portal-return",
    },
  )

  return session.url
end

#get_stripe_checkout_url(price_id) ⇒ Object



394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/webhookdb/organization.rb', line 394

def get_stripe_checkout_url(price_id)
  raise Webhookdb::InvalidPrecondition, "organization must be registered in Stripe" if self.stripe_customer_id.blank?
  session = Stripe::Checkout::Session.create(
    {
      customer: self.stripe_customer_id,
      cancel_url: Webhookdb.app_url + "/jump/checkout-cancel",
      line_items: [{
        price: price_id, quantity: 1,
      }],
      mode: "subscription",
      payment_method_types: ["card"],
      allow_promotion_codes: true,
      success_url: Webhookdb.app_url + "/jump/checkout-success",
    },
  )

  return session.url
end

#log_tagsObject



85
86
87
88
89
90
# File 'lib/webhookdb/organization.rb', line 85

def log_tags
  return {
    organization_id: self.id,
    organization_key: self.key,
  }
end

#migrate_replication_schema(schema) ⇒ Object



351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/webhookdb/organization.rb', line 351

def migrate_replication_schema(schema)
  unless Webhookdb::DBAdapter::VALID_IDENTIFIER.match?(schema)
    msg = "Sorry, this is not a valid schema name. " + Webhookdb::DBAdapter::INVALID_IDENTIFIER_MESSAGE
    raise SchemaMigrationError, msg
  end
  Webhookdb::Organization::DatabaseMigration.guard_ongoing!(self)
  raise SchemaMigrationError, "destination and target schema are the same" if schema == self.replication_schema
  builder = Webhookdb::Organization::DbBuilder.new(self)
  sql = builder.migration_replication_schema_sql(self.replication_schema, schema)
  self.admin_connection(transaction: true) do |db|
    db << sql
  end
  self.update(replication_schema: schema)
end

#migrate_replication_tablesObject

Get all the table names and column names for all integrations in the org Find any of those table/column pairs that are not present in information_schema.columns Ensure all columns for those integrations/tables.



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/webhookdb/organization.rb', line 298

def migrate_replication_tables
  tables = self.service_integrations.map(&:table_name)
  sequences_in_app_db = self.db[Sequel[:information_schema][:sequences]].
    grep(:sequence_name, "replicator_seq_org_#{self.id}_%").
    select_map(:sequence_name).
    to_set
  cols_in_org_db = {}
  indices_in_org_db = Set.new
  self.admin_connection do |db|
    cols_in_org_db = db[Sequel[:information_schema][:columns]].
      where(table_schema: self.replication_schema, table_name: tables).
      select(
        :table_name,
        Sequel.function(:array_agg, :column_name).cast("text[]").as(:columns),
      ).
      group_by(:table_name).
      all.
      to_h { |c| [c[:table_name], c[:columns]] }
    indices_in_org_db = db[Sequel[:pg_indexes]].
      where(schemaname: self.replication_schema, tablename: tables).
      select_map(:indexname).
      to_set
  end

  self.service_integrations.each do |sint|
    svc = sint.replicator
    existing_columns = cols_in_org_db.fetch(sint.table_name) { [] }
    cols_for_sint = svc.storable_columns.map { |c| c.name.to_s }
    all_sint_cols_exist = (cols_for_sint - existing_columns).empty?

    all_indices_exist = svc.indices(svc.dbadapter_table).all? do |ind|
      indices_in_org_db.include?(ind.name.to_s)
    end

    svc.ensure_all_columns unless all_sint_cols_exist && all_indices_exist
    if svc.requires_sequence? && !sequences_in_app_db.include?(sint.sequence_name)
      sint.ensure_sequence(skip_check: true)
    end
  end
end

#prepare_database_connections(safe: false) ⇒ Object

Build the org-specific users, database, and set our connection URLs to it.



207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/webhookdb/organization.rb', line 207

def prepare_database_connections(safe: false)
  self.db.transaction do
    self.lock!
    if self.admin_connection_url.present?
      return if safe
      raise Webhookdb::InvalidPrecondition, "connections already set"
    end
    builder = Webhookdb::Organization::DbBuilder.new(self)
    builder.prepare_database_connections
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#prepare_database_connections?Boolean

Returns:

  • (Boolean)


202
203
204
# File 'lib/webhookdb/organization.rb', line 202

def prepare_database_connections?
  return self.prepare_database_connections(safe: true)
end

#readonly_connection(**kw) ⇒ Object



119
120
121
# File 'lib/webhookdb/organization.rb', line 119

def readonly_connection(**kw, &)
  return Webhookdb::ConnectionCache.borrow(self.readonly_connection_url_raw, **kw, &)
end

#readonly_connection_urlObject

Return the readonly connection url, with the host set to public_host if set.



152
153
154
# File 'lib/webhookdb/organization.rb', line 152

def readonly_connection_url
  return self._public_host_connection_url(self.readonly_connection_url_raw)
end

#readonly_userObject



187
188
189
190
# File 'lib/webhookdb/organization.rb', line 187

def readonly_user
  ur = URI(self.readonly_connection_url)
  return ur.user
end

#register_in_stripeObject



366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/webhookdb/organization.rb', line 366

def register_in_stripe
  raise Webhookdb::InvalidPrecondition, "org already in Stripe" if self.stripe_customer_id.present?
  stripe_customer = Stripe::Customer.create(
    {
      name: self.name,
      email: self.billing_email,
      metadata: {
        org_id: self.id,
      },
    },
  )
  self.stripe_customer_id = stripe_customer.id
  self.save_changes
  return stripe_customer
end

Delete the org-specific database and remove the org connection strings. Use this when an org is to be deleted (either for real, or in test teardown).



239
240
241
242
243
244
245
246
247
# File 'lib/webhookdb/organization.rb', line 239

def remove_related_database
  self.db.transaction do
    self.lock!
    Webhookdb::Organization::DbBuilder.new(self).remove_related_database
    self.admin_connection_url_raw = ""
    self.readonly_connection_url_raw = ""
    self.save_changes
  end
end

#roll_database_credentialsObject

Modify the admin and readonly users to have new usernames and passwords.



340
341
342
343
344
345
346
347
348
349
# File 'lib/webhookdb/organization.rb', line 340

def roll_database_credentials
  self.db.transaction do
    self.lock!
    builder = Webhookdb::Organization::DbBuilder.new(self)
    builder.roll_connection_credentials
    self.admin_connection_url_raw = builder.admin_url
    self.readonly_connection_url_raw = builder.readonly_url
    self.save_changes
  end
end

#single_db_user?Boolean

In cases where the readonly and admin user are the same, we sometimes adapt queries to prevent revoking admin db priviliges.

Returns:

  • (Boolean)


194
195
196
# File 'lib/webhookdb/organization.rb', line 194

def single_db_user?
  return self.admin_user == self.readonly_user
end

#validateObject

:section: Validations



460
461
462
463
464
465
466
# File 'lib/webhookdb/organization.rb', line 460

def validate
  super
  validates_all_or_none(:admin_connection_url_raw, :readonly_connection_url_raw, predicate: :present?)
  validates_format(/^\D/, :name, message: "can't begin with a digit")
  validates_format(/^[a-z][a-z0-9_]*$/, :key, message: "is not valid as a CNAME")
  validates_max_length 63, :key, message: "is not valid as a CNAME"
end