Class: StatelyDB::CoreClient

Inherits:
Object
  • Object
show all
Defined in:
lib/statelydb.rb

Overview

CoreClient is a low level client for interacting with the Stately Cloud API. This client shouldn’t be used directly in most cases. Instead, use the generated client for your schema.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store_id:, schema:, token_provider: nil, endpoint: nil, region: nil, no_auth: false) ⇒ CoreClient

Initialize a new StatelyDB CoreClient

Parameters:

  • store_id (Integer)

    the StatelyDB to use for all operations with this client.

  • schema (Module)

    the generated Schema module to use for mapping StatelyDB Items.

  • token_provider (StatelyDB::Common::Auth::TokenProvider) (defaults to: nil)

    the token provider to use for authentication.

  • endpoint (String) (defaults to: nil)

    the endpoint to connect to.

  • region (String) (defaults to: nil)

    the region to connect to.

  • no_auth (Boolean) (defaults to: false)

    Indicates that the client should not attempt to get an auth token. This is used when talking to the Stately BYOC Data Plane on localhost.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/statelydb.rb', line 35

def initialize(store_id:,
               schema:,
               token_provider: nil,
               endpoint: nil,
               region: nil,
               no_auth: false)
  if store_id.nil?
    raise StatelyDB::Error.new("store_id is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end
  if schema.nil?
    raise StatelyDB::Error.new("schema is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end

  endpoint = self.class.make_endpoint(endpoint:, region:)
  @channel = Common::Net.new_channel(endpoint:)
  interceptors = [Common::ErrorInterceptor.new]
  # Make sure to use the correct endpoint for the default token provider
  unless no_auth
    @token_provider = token_provider || Common::Auth::AuthTokenProvider.new
    @token_provider.start(endpoint: endpoint)
    interceptors << Common::Auth::Interceptor.new(token_provider: @token_provider)
  end

  @stub = Stately::Db::DatabaseService::Stub.new(nil, nil,
                                                 channel_override: @channel, interceptors:)
  @store_id = store_id.to_i
  @schema = schema
  @allow_stale = false
end

Class Method Details

.make_endpoint(endpoint: nil, region: nil) ⇒ String

Construct the API endpoint from the region and endpoint. If the endpoint is provided, it will be returned as-is. If the region is provided and the endpoint is not, then the region-specific endpoint will be returned. If neither the region nor the endpoint is provided, then the default endpoint will be returned.

Parameters:

  • endpoint (String) (defaults to: nil)

    the endpoint to connect to

  • region (String) (defaults to: nil)

    the region to connect to

Returns:

  • (String)

    the constructed endpoint



449
450
451
452
453
454
455
456
# File 'lib/statelydb.rb', line 449

def self.make_endpoint(endpoint: nil, region: nil)
  return endpoint unless endpoint.nil?
  return "https://api.stately.cloud" if region.nil?

  region = region.sub("aws-", "") if region.start_with?("aws-")

  "https://#{region}.aws.api.stately.cloud"
end

Instance Method Details

#begin_list(prefix, limit: 100, sort_property: nil, sort_direction: :ascending, item_types: [], cel_filters: [], gt: nil, gte: nil, lt: nil, lte: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

Returns the list of Items and the token.

Examples:

client.data.begin_list("/ItemType-identifier", limit: 10, sort_direction: :ascending)

Returns:



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/statelydb.rb', line 164

def begin_list(prefix,
               limit: 100,
               sort_property: nil,
               sort_direction: :ascending,
               item_types: [],
               cel_filters: [],
               gt: nil,
               gte: nil,
               lt: nil,
               lte: nil)
  sort_direction = sort_direction == :ascending ? 0 : 1
  key_condition_params = [
    [Stately::Db::Operator::OPERATOR_GREATER_THAN, gt.is_a?(String) ? gt : gt&.to_s],
    [Stately::Db::Operator::OPERATOR_GREATER_THAN_OR_EQUAL, gte.is_a?(String) ? gte : gte&.to_s],
    [Stately::Db::Operator::OPERATOR_LESS_THAN, lt.is_a?(String) ? lt : lt&.to_s],
    [Stately::Db::Operator::OPERATOR_LESS_THAN_OR_EQUAL, lte.is_a?(String) ? lte : lte&.to_s]
  ]
  key_conditions = key_condition_params
                   .reject { |(_, value)| value.nil? }
                   .map { |(operator, key_path)| Stately::Db::KeyCondition.new(operator: operator, key_path: key_path) }

  req = Stately::Db::BeginListRequest.new(
    store_id: @store_id,
    key_path_prefix: String(prefix),
    limit:,
    sort_property:,
    sort_direction:,
    filter_conditions: build_filters(item_types: item_types, cel_filters: cel_filters),
    key_conditions: key_conditions,
    allow_stale: @allow_stale,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_list(req)
  process_list_response(resp)
end

#begin_scan(limit: 0, item_types: [], cel_filters: [], total_segments: nil, segment_index: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

Initiates a scan request which will scan over the entire store and apply the provided filters. This API returns a token that you can pass to continue_scan to paginate through the result set. This can fail if the caller does not have permission to read Items.

WARNING: THIS API CAN BE EXTREMELY EXPENSIVE FOR STORES WITH A LARGE NUMBER OF ITEMS.

Examples:

client.data.begin_scan(limit: 10, item_types: [MyItem])

Parameters:

  • limit (Integer) (defaults to: 0)

    the max number of items to retrieve. If set to 0 then the first page of results will be returned which may empty because it does not contain items of your selected item types. Be sure to check token.can_continue to see if there are more results to fetch.

  • item_types (Array<StatelyDB::Item, String>) (defaults to: [])

    the item types to filter by. The returned items will be instances of one of these types.

  • cel_filters (Array<Array<Class, String>, String>) (defaults to: [])

    ] An optional list of item_type, cel_expression tuples that represent CEL expressions to filter the results set by. Use the cel_filter helper function to build these expressions. CEL expressions are only evaluated for the item type they are defined for, and do not affect other item types in the result set. This means if an item type has no CEL filter and there are no item_type filters constraints, it will be included in the result set. In the context of a CEL expression, the key-word ‘this` refers to the item being evaluated, and property properties should be accessed by the names as they appear in schema – not necessarily as they appear in the generated code for a particular language. For example, if you have a `Movie` item type with the property `rating`, you could write a CEL expression like `this.rating == ’R’‘ to return only movies that are rated `R`. Find the full CEL language definition here: github.com/google/cel-spec/blob/master/doc/langdef.md

  • total_segments (Integer) (defaults to: nil)

    the total number of segments to divide the scan into. Use this when you want to parallelize your operation.

  • segment_index (Integer) (defaults to: nil)

    the index of the segment to scan. Use this when you want to parallelize your operation.

Returns:



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/statelydb.rb', line 256

def begin_scan(limit: 0,
               item_types: [],
               cel_filters: [],
               total_segments: nil,
               segment_index: nil)
  if total_segments.nil? != segment_index.nil?
    raise StatelyDB::Error.new("total_segments and segment_index must both be set or both be nil",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end

  req = Stately::Db::BeginScanRequest.new(
    store_id: @store_id,
    limit:,
    filter_conditions: build_filters(item_types: item_types, cel_filters: cel_filters),
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_scan(req)
  process_list_response(resp)
end

#closevoid

This method returns an undefined value.

Returns nil.



70
71
72
73
# File 'lib/statelydb.rb', line 70

def close
  @channel&.close
  @token_provider&.close
end

#continue_list(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

Continue listing Items from a StatelyDB Store using a token.

Examples:

(items, token) = client.data.begin_list("/ItemType-identifier")
client.data.continue_list(token)

Parameters:

Returns:



209
210
211
212
213
214
215
216
217
# File 'lib/statelydb.rb', line 209

def continue_list(token)
  req = Stately::Db::ContinueListRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_list(req)
  process_list_response(resp)
end

#continue_scan(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

continue_scan takes the token from a begin_scan call and returns more results based on the original request parameters and pagination options.

WARNING: THIS API CAN BE EXTREMELY EXPENSIVE FOR STORES WITH A LARGE NUMBER OF ITEMS.

Examples:

(items, token) = client.data.begin_scan(limit: 10, item_types: [MyItem])
client.data.continue_scan(token)

Parameters:

Returns:



289
290
291
292
293
294
295
296
297
# File 'lib/statelydb.rb', line 289

def continue_scan(token)
  req = Stately::Db::ContinueScanRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_scan(req)
  process_list_response(resp)
end

#delete(*key_paths) ⇒ void

This method returns an undefined value.

Delete up to 50 Items from a StatelyDB Store at the given key_paths.

Examples:

client.data.delete("/ItemType-identifier", "/ItemType-identifier2")

Parameters:

Raises:



394
395
396
397
398
399
400
401
402
403
404
# File 'lib/statelydb.rb', line 394

def delete(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::DeleteRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    deletes: key_paths.map { |key_path| Stately::Db::DeleteItem.new(key_path: String(key_path)) }
  )
  @stub.delete(req)
  nil
end

#get(key_path) ⇒ StatelyDB::Item, NilClass

Fetch a single Item from a StatelyDB Store at the given key_path.

Examples:

client.get("/ItemType-identifier")

Parameters:

Returns:

Raises:

  • (StatelyDB::Error)

    if the parameters are invalid or if the item is not found



95
96
97
98
99
100
# File 'lib/statelydb.rb', line 95

def get(key_path)
  resp = get_batch(key_path)

  # Always return a single Item.
  resp.first
end

#get_batch(*key_paths) ⇒ Array<StatelyDB::Item>, NilClass

Fetch a batch of up to 100 Items from a StatelyDB Store at the given key_paths.

Examples:

client.data.get_batch("/ItemType-identifier", "/ItemType-identifier2")

Parameters:

Returns:

Raises:

  • (StatelyDB::Error)

    if the parameters are invalid or if the item is not found



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/statelydb.rb', line 111

def get_batch(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::GetRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    gets:
      key_paths.map { |key_path| Stately::Db::GetItem.new(key_path: String(key_path)) },
    allow_stale: @allow_stale
  )

  resp = @stub.get(req)
  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

#put(item, must_not_exist: false, overwrite_metadata_timestamps: false) ⇒ StatelyDB::Item

Put an Item into a StatelyDB Store at the given key_path.

Examples:

client.data.put(my_item)

client.data.put(my_item, must_not_exist: true)

Parameters:

  • item (StatelyDB::Item)

    a StatelyDB Item

  • must_not_exist (Boolean) (defaults to: false)

    A condition that indicates this item must not already exist at any of its key paths. If there is already an item at one of those paths, the Put operation will fail with a “ConditionalCheckFailed” error. Note that if the item has an ‘initialValue` field in its key, that initial value will automatically be chosen not to conflict with existing items, so this condition only applies to key paths that do not contain the `initialValue` field.

  • overwrite_metadata_timestamps (Boolean) (defaults to: false)

    If set to true, the server will set the ‘createdAtTime` and/or `lastModifiedAtTime` fields based on the current values in this item (assuming you’ve mapped them to a field using ‘fromMetadata`). Without this, those fields are always ignored and the server sets them to the appropriate times. This option can be useful when migrating data from another system.

Returns:



337
338
339
340
341
342
343
344
# File 'lib/statelydb.rb', line 337

def put(item,
        must_not_exist: false,
        overwrite_metadata_timestamps: false)
  resp = put_batch({ item:, must_not_exist:, overwrite_metadata_timestamps: })

  # Always return a single Item.
  resp.first
end

#put_batch(*items) ⇒ Array<StatelyDB::Item>

Put a batch of up to 50 Items into a StatelyDB Store.

Max 50 items.

Examples:

client.data.put_batch(item1, item2)
client.data.put_batch({ item: item1, must_not_exist: true }, item2)

Parameters:

Returns:



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/statelydb.rb', line 356

def put_batch(*items)
  puts = Array(items).flatten.map do |input|
    if input.is_a?(Hash)
      item = input[:item]
      Stately::Db::PutItem.new(
        item: item.send("marshal_stately"),
        overwrite_metadata_timestamps: input[:overwrite_metadata_timestamps],
        must_not_exist: input[:must_not_exist]
      )
    else
      Stately::Db::PutItem.new(
        item: input.send("marshal_stately")
      )
    end
  end
  req = Stately::Db::PutRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    puts:
  )
  resp = @stub.put(req)

  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

#sync_list(token) ⇒ StatelyDB::SyncResult

Sync a list of Items from a StatelyDB Store.

Examples:

(items, token) = client.data.begin_list("/ItemType-identifier")
client.data.sync_list(token)

Parameters:

Returns:



307
308
309
310
311
312
313
314
315
# File 'lib/statelydb.rb', line 307

def sync_list(token)
  req = Stately::Db::SyncListRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.sync_list(req)
  process_sync_response(resp)
end

#transactionStatelyDB::Transaction::Transaction::Result

Transaction takes a block and executes the block within a transaction. If the block raises an exception, the transaction is rolled back. If the block completes successfully, the transaction is committed.

Examples:

client.data.transaction do |txn|
  txn.put(item: my_item)
  txn.put(item: another_item)
end

Returns:

Raises:



420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# File 'lib/statelydb.rb', line 420

def transaction
  txn = StatelyDB::Transaction::Transaction.new(stub: @stub, store_id: @store_id, schema: @schema)
  txn.begin
  yield txn
  txn.commit
rescue StatelyDB::Error
  raise
# Handle any other exceptions and abort the transaction. We're rescuing Exception here
# because we want to catch all exceptions, including those that don't inherit from StandardError.
rescue Exception => e
  txn.abort

  # All gRPC errors inherit from GRPC::BadStatus. We wrap these in a StatelyDB::Error.
  raise StatelyDB::Error.from(e) if e.is_a? GRPC::BadStatus

  # Calling raise with no parameters re-raises the original exception
  raise
end

#with_allow_stale(allow_stale) ⇒ self

Set whether to allow stale results for all operations with this client. This produces a new client with the allow_stale flag set.

Examples:

client.with_allow_stale(true).get("/ItemType-identifier")

Parameters:

  • allow_stale (Boolean)

    whether to allow stale results

Returns:

  • (self)

    a new client with the allow_stale flag set



81
82
83
84
85
# File 'lib/statelydb.rb', line 81

def with_allow_stale(allow_stale)
  new_client = clone
  new_client.instance_variable_set(:@allow_stale, allow_stale)
  new_client
end