Class: StatelyDB::CoreClient

Inherits:
Object
  • Object
show all
Defined in:
lib/statelydb.rb

Overview

CoreClient is a low level client for interacting with the Stately Cloud API. This client shouldn’t be used directly in most cases. Instead, use the generated client for your schema.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store_id:, schema:, token_provider: nil, endpoint: nil, region: nil, no_auth: false) ⇒ CoreClient

Initialize a new StatelyDB CoreClient

Parameters:

  • store_id (Integer)

    the StatelyDB store to use for all operations with this client.

  • schema (Module)

    the generated Schema module to use for mapping StatelyDB Items.

  • token_provider (StatelyDB::Common::Auth::TokenProvider) (defaults to: nil)

    the token provider to use for authentication.

  • endpoint (String) (defaults to: nil)

    the endpoint to connect to.

  • region (String) (defaults to: nil)

    the region to connect to.

  • no_auth (Boolean) (defaults to: false)

    Indicates that the client should not attempt to get an auth token. This is used when talking to the Stately BYOC Data Plane on localhost.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/statelydb.rb', line 35

def initialize(store_id:,
               schema:,
               token_provider: nil,
               endpoint: nil,
               region: nil,
               no_auth: false)
  if store_id.nil?
    raise StatelyDB::Error.new("store_id is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end
  if schema.nil?
    raise StatelyDB::Error.new("schema is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end

  endpoint = self.class.make_endpoint(endpoint:, region:)
  @channel = Common::Net.new_channel(endpoint:)
  interceptors = [Common::ErrorInterceptor.new]
  # Make sure to use the correct endpoint for the default token provider
  unless no_auth
    @token_provider = token_provider || Common::Auth::AuthTokenProvider.new
    @token_provider.start(endpoint: endpoint)
    interceptors << Common::Auth::Interceptor.new(token_provider: @token_provider)
  end

  @stub = Stately::Db::DatabaseService::Stub.new(nil, nil,
                                                 channel_override: @channel, interceptors:)
  @store_id = store_id.to_i
  @schema = schema
  @allow_stale = false
end

Class Method Details

.make_endpoint(endpoint: nil, region: nil) ⇒ String

Construct the API endpoint from the region and endpoint. If the endpoint is provided, it will be returned as-is. If the region is provided and the endpoint is not, then the region-specific endpoint will be returned. If neither the region nor the endpoint is provided, then the default endpoint will be returned.

Parameters:

  • endpoint (String) (defaults to: nil)

    the endpoint to connect to

  • region (String) (defaults to: nil)

    the region to connect to

Returns:

  • (String)

    the constructed endpoint



551
552
553
554
555
556
557
558
# File 'lib/statelydb.rb', line 551

def self.make_endpoint(endpoint: nil, region: nil)
  return endpoint unless endpoint.nil?
  return "https://api.stately.cloud" if region.nil?

  region = region.sub("aws-", "") if region.start_with?("aws-")

  "https://#{region}.aws.api.stately.cloud"
end

Instance Method Details

#begin_list(prefix, limit: 100, sort_direction: :ascending, item_types: [], cel_filters: [], gt: nil, gte: nil, lt: nil, lte: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

Returns the list of Items and the token.

Examples:

client.data.begin_list("/ItemType-identifier", limit: 10, sort_direction: :ascending)

Returns:



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/statelydb.rb', line 181

def begin_list(prefix,
               limit: 100,
               sort_direction: :ascending,
               item_types: [],
               cel_filters: [],
               gt: nil,
               gte: nil,
               lt: nil,
               lte: nil)
  sort_direction = sort_direction == :ascending ? 0 : 1
  key_condition_params = [
    [Stately::Db::Operator::OPERATOR_GREATER_THAN, gt.is_a?(String) ? gt : gt&.to_s],
    [Stately::Db::Operator::OPERATOR_GREATER_THAN_OR_EQUAL, gte.is_a?(String) ? gte : gte&.to_s],
    [Stately::Db::Operator::OPERATOR_LESS_THAN, lt.is_a?(String) ? lt : lt&.to_s],
    [Stately::Db::Operator::OPERATOR_LESS_THAN_OR_EQUAL, lte.is_a?(String) ? lte : lte&.to_s]
  ]
  key_conditions = key_condition_params
                   .reject { |(_, value)| value.nil? }
                   .map { |(operator, key_path)| Stately::Db::KeyCondition.new(operator: operator, key_path: key_path) }

  req = Stately::Db::BeginListRequest.new(
    store_id: @store_id,
    key_path_prefix: String(prefix),
    limit:,
    sort_direction:,
    filter_conditions: build_filters(item_types: item_types, cel_filters: cel_filters),
    key_conditions: key_conditions,
    allow_stale: @allow_stale,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_list(req)
  process_list_response(resp)
end

#begin_scan(limit: 0, item_types: [], cel_filters: [], total_segments: nil, segment_index: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

begin_scan initiates a scan request which will scan over the entire store and apply the provided filters. This API returns a token that you can pass to continue_scan to paginate through the result set.

begin_scan can return items of many types, and you can check the class of each result to handle different item types.

WARNING: THIS API CAN BE EXPENSIVE FOR STORES WITH A LARGE NUMBER OF ITEMS.

Examples:

client.data.begin_scan(limit: 10, item_types: [MyItem])

Parameters:

  • limit (Integer) (defaults to: 0)

    the max number of items to retrieve. If set to 0 then the first page of results will be returned which may empty because it does not contain items of your selected item types. Be sure to check token.can_continue to see if there are more results to fetch.

  • item_types (Array<StatelyDB::Item, String>) (defaults to: [])

    the item types to filter by. The returned items will be instances of one of these types.

  • cel_filters (Array<Array<Class, String>, String>) (defaults to: [])

    ] An optional list of item_type, cel_expression tuples that represent CEL expressions to filter the results set by. Use the cel_filter helper function to build these expressions. CEL expressions are only evaluated for the item type they are defined for, and do not affect other item types in the result set. This means if an item type has no CEL filter and there are no item_type filters constraints, it will be included in the result set. In the context of a CEL expression, the key-word ‘this` refers to the item being evaluated, and property properties should be accessed by the names as they appear in schema – not necessarily as they appear in the generated code for a particular language. For example, if you have a `Movie` item type with the property `rating`, you could write a CEL expression like `this.rating == ’R’‘ to return only movies that are rated `R`. Find the full CEL language definition here: github.com/google/cel-spec/blob/master/doc/langdef.md

  • total_segments (Integer) (defaults to: nil)

    the total number of segments to divide the scan into. Use this when you want to parallelize your operation.

  • segment_index (Integer) (defaults to: nil)

    the index of the segment to scan. Use this when you want to parallelize your operation.

Returns:



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/statelydb.rb', line 287

def begin_scan(limit: 0,
               item_types: [],
               cel_filters: [],
               total_segments: nil,
               segment_index: nil)
  if total_segments.nil? != segment_index.nil?
    raise StatelyDB::Error.new("total_segments and segment_index must both be set or both be nil",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end

  req = Stately::Db::BeginScanRequest.new(
    store_id: @store_id,
    limit:,
    filter_conditions: build_filters(item_types: item_types, cel_filters: cel_filters),
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_scan(req)
  process_list_response(resp)
end

#closevoid

This method returns an undefined value.

Returns nil.



70
71
72
73
# File 'lib/statelydb.rb', line 70

def close
  @channel&.close
  @token_provider&.close
end

#continue_list(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

continue_list takes the token from a begin_list call and returns the next “page” of results based on the original query parameters and pagination options. It doesn’t have options because it is a continuation of a previous list operation. It will return a new token which can be used for another continue_list call, and so on. The token is the same one used by sync_list - each time you call either continue_list or sync_list, you should pass the latest version of the token, and the result will include a new version of the token to use in subsequent calls. You may interleave continue_list and sync_list calls however you like, but it does not make sense to make both calls in parallel. Calls to continue_list are tied to the authorization of the original begin_list call, so if the original begin_list call was allowed, continue_list with its token should also be allowed.

You can list items of different types in a single continue_list, and you can check the class of each result to handle different item types.

Examples:

(items, token) = client.data.begin_list("/ItemType-identifier")
client.data.continue_list(token)

Parameters:

Returns:



239
240
241
242
243
244
245
246
247
# File 'lib/statelydb.rb', line 239

def continue_list(token)
  req = Stately::Db::ContinueListRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_list(req)
  process_list_response(resp)
end

#continue_scan(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

continue_scan takes the token from a begin_scan call and returns the next “page” of results based on the original query parameters and pagination options.

You can scan items of different types in a single continue_scan, and you can check the class of each result to handle different item types.

Examples:

(items, token) = client.data.begin_scan(limit: 10, item_types: [MyItem])
client.data.continue_scan(token)

Parameters:

Returns:



322
323
324
325
326
327
328
329
330
# File 'lib/statelydb.rb', line 322

def continue_scan(token)
  req = Stately::Db::ContinueScanRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_scan(req)
  process_list_response(resp)
end

#delete(*key_paths) ⇒ void

This method returns an undefined value.

delete removes one or more items from the Store by their full key paths. delete succeeds even if there isn’t an item at that key path. Tombstones will be saved for deleted items for some time, so that sync_list can return information about deleted items. Deletes are always applied atomically; all will fail or all will succeed.

The full key paths of the items. @raise [StatelyDB::Error] if the parameters are invalid @raise [StatelyDB::Error] if the item is not found

Examples:

client.data.delete(“/ItemType-identifier”,

"/ItemType-identifier2")

Parameters:



478
479
480
481
482
483
484
485
486
487
488
# File 'lib/statelydb.rb', line 478

def delete(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::DeleteRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    deletes: key_paths.map { |key_path| Stately::Db::DeleteItem.new(key_path: String(key_path)) }
  )
  @stub.delete(req)
  nil
end

#get(key_path) ⇒ StatelyDB::Item, NilClass

get retrieves an item by its full key path.

Examples:

client.get("/ItemType-identifier")

Parameters:

Returns:

Raises:

  • (StatelyDB::Error)

    if the parameters are invalid or nil if the item is not found



97
98
99
100
101
102
# File 'lib/statelydb.rb', line 97

def get(key_path)
  resp = get_batch(key_path)

  # Always return a single Item.
  resp.first
end

#get_batch(*key_paths) ⇒ Array<StatelyDB::Item>

get_batch retrieves multiple items by their full key paths. This will return the corresponding items that exist. Use begin_list instead if you want to retrieve multiple items but don’t already know the full key paths of the items you want to get. You can get items of different types in a single get_batch - you will need to check the class of each result to determine what item type it is.

Examples:

client.data.get_batch("/ItemType-identifier", "/ItemType-identifier2")

Parameters:

Returns:

Raises:

  • (StatelyDB::Error)

    if the parameters are invalid or if the item is not found



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/statelydb.rb', line 117

def get_batch(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::GetRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    gets:
      key_paths.map { |key_path| Stately::Db::GetItem.new(key_path: String(key_path)) },
    allow_stale: @allow_stale
  )

  resp = @stub.get(req)
  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

#put(item, must_not_exist: false, overwrite_metadata_timestamps: false) ⇒ StatelyDB::Item

put adds an Item to the Store, or replaces the Item if it already exists at that path.

This call will fail if:

- The Item conflicts with an existing Item at the same path and the
  must_not_exist option is set, or the item's ID will be chosen with
  an `initialValue` and one of its other key paths conflicts with an
  existing item.

Examples:

client.data.put(my_item)


client.data.put(my_item, must_not_exist: true)


Parameters:

  • item (StatelyDB::Item)

    An Item from your generated schema.

  • must_not_exist (Boolean) (defaults to: false)

    A condition that indicates this item must not already exist at any of its key paths. If there is already an item at one of those paths, the Put operation will fail with a “ConditionalCheckFailed” error. Note that if the item has an ‘initialValue` field in its key, that initial value will automatically be chosen not to conflict with existing items, so this condition only applies to key paths that do not contain the `initialValue` field.

  • overwrite_metadata_timestamps (Boolean) (defaults to: false)

    If set to true, the server will set the ‘createdAtTime` and/or `lastModifiedAtTime` fields based on the current values in this item (assuming you’ve mapped them to a field using ‘fromMetadata`). Without this, those fields are always ignored and the server sets them to the appropriate times. This option can be useful when migrating data from another system.

Returns:

  • (StatelyDB::Item)

    The item that was put, with any server-generated fields filled in.



409
410
411
412
413
414
415
416
# File 'lib/statelydb.rb', line 409

def put(item,
        must_not_exist: false,
        overwrite_metadata_timestamps: false)
  resp = put_batch({ item:, must_not_exist:, overwrite_metadata_timestamps: })

  # Always return a single Item.
  resp.first
end

#put_batch(*items) ⇒ Array<StatelyDB::Item>

put_batch adds multiple Items to the Store, or replaces Items if they already exist at that path. You can put items of different types in a single put_batch. All puts in the request are applied atomically - there are no partial successes.

This will fail if:

- Any Item conflicts with an existing Item at the same path and its
  must_not_exist option is set, or the item's ID will be chosen with an
  `initialValue` and one of its other key paths conflicts with an existing
  item.

Examples:

client.data.put_batch(item1, item2)
client.data.put_batch({ item: item1, must_not_exist: true }, item2)

Parameters:

Returns:

  • (Array<StatelyDB::Item>)

    The items that were put, with any server-generated fields filled in. They are returned in the same order they were provided.



437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
# File 'lib/statelydb.rb', line 437

def put_batch(*items)
  puts = Array(items).flatten.map do |input|
    if input.is_a?(Hash)
      item = input[:item]
      Stately::Db::PutItem.new(
        item: item.send("marshal_stately"),
        overwrite_metadata_timestamps: input[:overwrite_metadata_timestamps],
        must_not_exist: input[:must_not_exist]
      )
    else
      Stately::Db::PutItem.new(
        item: input.send("marshal_stately")
      )
    end
  end
  req = Stately::Db::PutRequest.new(
    store_id: @store_id,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    puts:
  )
  resp = @stub.put(req)

  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

#sync_list(token) ⇒ StatelyDB::SyncResult

sync_list returns all changes to Items within the result set of a previous List operation. For all Items within the result set that were modified, it returns the full Item at in its current state. If the result set has already been expanded to the end (in the direction of the original begin_list request), sync_list will return newly created Items as well. It also returns a list of Item key paths that were deleted since the last sync_list, which you should reconcile with your view of items returned from previous begin_list/continue_list calls. Using this API, you can start with an initial set of items from begin_list, and then stay up to date on any changes via repeated sync_list requests over time.

The token is the same one used by continue_list - each time you call either continue_list or sync_list, you should pass the latest version of the token, and then use the new token from the result in subsequent calls. You may interleave continue_list and sync_list calls however you like, but it does not make sense to make both calls in parallel. Calls to sync_list are tied to the authorization of the original begin_list call, so if the original begin_list call was allowed, sync_list with its token should also be allowed.

The result will contain:

- changed_items: Items that were changed or added since the last
  sync_list call.
- deleted_item_paths: The key paths of items that were deleted since
  the last sync_list call.
- updated_outside_list_window_paths: Item that were updated but
  are not within the current result set. You can treat this like
  deleted_item_paths, but the item hasn't actually been deleted, it's
  just not part of your view of the list anymore.
- is_reset: A reset signal that indicates any previously cached
  view of the result set is no longer valid. You should throw away
  any locally cached data. Use the changed_items list from this result
  as the new view of the result set.

Examples:

(items, token) = client.data.begin_list("/ItemType-identifier")
client.data.sync_list(token)

Parameters:

Returns:



372
373
374
375
376
377
378
379
380
# File 'lib/statelydb.rb', line 372

def sync_list(token)
  req = Stately::Db::SyncListRequest.new(
    token_data: token.token_data,
    schema_id: @schema::SCHEMA_ID,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.sync_list(req)
  process_sync_response(resp)
end

#transactionStatelyDB::Transaction::Transaction::Result

transaction allows you to issue reads and writes in any order, and all writes will either succeed or all will fail when the transaction finishes. It takes a block with a single argument that can be used to run commands within the transaction.

Reads are guaranteed to reflect the state as of when the transaction started. A transaction may fail if another transaction commits before this one finishes - in that case, you should retry your transaction.

If any error is thrown from the block, the transaction is aborted and none of the changes made in it will be applied. If the handler returns without error, the transaction is automatically committed.

If any of the operations in the block fails (e.g. a request is invalid) you may not find out until the next operation, or once the block finishes, due to some technicalities about how requests are handled.

When the transaction is committed, the result property will contain the full version of any items that were put in the transaction, and the committed property will be True. If the transaction was aborted, the committed property will be False.

Examples:

client.data.transaction do |txn|
  txn.put(item: my_item)
  txn.put(item: another_item)
end

Returns:

Raises:



522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
# File 'lib/statelydb.rb', line 522

def transaction
  txn = StatelyDB::Transaction::Transaction.new(stub: @stub, store_id: @store_id, schema: @schema)
  txn.begin
  yield txn
  txn.commit
rescue StatelyDB::Error
  raise
# Handle any other exceptions and abort the transaction. We're rescuing Exception here
# because we want to catch all exceptions, including those that don't inherit from StandardError.
rescue Exception => e
  txn.abort

  # All gRPC errors inherit from GRPC::BadStatus. We wrap these in a StatelyDB::Error.
  raise StatelyDB::Error.from(e) if e.is_a? GRPC::BadStatus

  # Calling raise with no parameters re-raises the original exception
  raise
end

#with_allow_stale(allow_stale) ⇒ self

Returns a new client that is either OK with or not OK with stale reads. This affects get and list operations from the returned client. Use this only if you know you can tolerate stale reads. This can result in improved performance, availability, and cost.

Examples:

client.with_allow_stale(true).get("/ItemType-identifier")

Parameters:

  • allow_stale (Boolean)

    Whether staleness is allowed or not.

Returns:

  • (self)

    A clone of the existing client with allow_stale set to the new value.



83
84
85
86
87
# File 'lib/statelydb.rb', line 83

def with_allow_stale(allow_stale)
  new_client = clone
  new_client.instance_variable_set(:@allow_stale, allow_stale)
  new_client
end