Class: StatelyDB::CoreClient
- Inherits:
-
Object
- Object
- StatelyDB::CoreClient
- Defined in:
- lib/statelydb.rb
Overview
CoreClient is a low level client for interacting with the Stately Cloud API. This client shouldn’t be used directly in most cases. Instead, use the generated client for your schema.
Class Method Summary collapse
-
.make_endpoint(endpoint: nil, region: nil) ⇒ String
Construct the API endpoint from the region and endpoint.
Instance Method Summary collapse
-
#begin_list(prefix, limit: 100, sort_direction: :ascending, item_types: [], cel_filters: [], gt: nil, gte: nil, lt: nil, lte: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
The list of Items and the token.
-
#begin_scan(limit: 0, item_types: [], cel_filters: [], total_segments: nil, segment_index: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
begin_scan initiates a scan request which will scan over the entire store and apply the provided filters.
-
#close ⇒ void
Nil.
-
#continue_list(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
continue_list takes the token from a begin_list call and returns the next “page” of results based on the original query parameters and pagination options.
-
#continue_scan(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
continue_scan takes the token from a begin_scan call and returns the next “page” of results based on the original query parameters and pagination options.
-
#delete(*key_paths) ⇒ void
delete removes one or more items from the Store by their full key paths.
-
#get(key_path) ⇒ StatelyDB::Item, NilClass
get retrieves an item by its full key path.
-
#get_batch(*key_paths) ⇒ Array<StatelyDB::Item>
get_batch retrieves multiple items by their full key paths.
-
#initialize(store_id:, schema:, token_provider: nil, endpoint: nil, region: nil, no_auth: false) ⇒ CoreClient
constructor
Initialize a new StatelyDB CoreClient.
-
#put(item, must_not_exist: false, overwrite_metadata_timestamps: false) ⇒ StatelyDB::Item
put adds an Item to the Store, or replaces the Item if it already exists at that path.
-
#put_batch(*items) ⇒ Array<StatelyDB::Item>
put_batch adds multiple Items to the Store, or replaces Items if they already exist at that path.
-
#sync_list(token) ⇒ StatelyDB::SyncResult
sync_list returns all changes to Items within the result set of a previous List operation.
-
#transaction ⇒ StatelyDB::Transaction::Transaction::Result
transaction allows you to issue reads and writes in any order, and all writes will either succeed or all will fail when the transaction finishes.
-
#with_allow_stale(allow_stale) ⇒ self
Returns a new client that is either OK with or not OK with stale reads.
Constructor Details
#initialize(store_id:, schema:, token_provider: nil, endpoint: nil, region: nil, no_auth: false) ⇒ CoreClient
Initialize a new StatelyDB CoreClient
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/statelydb.rb', line 35 def initialize(store_id:, schema:, token_provider: nil, endpoint: nil, region: nil, no_auth: false) if store_id.nil? raise StatelyDB::Error.new("store_id is required", code: GRPC::Core::StatusCodes::INVALID_ARGUMENT, stately_code: "InvalidArgument") end if schema.nil? raise StatelyDB::Error.new("schema is required", code: GRPC::Core::StatusCodes::INVALID_ARGUMENT, stately_code: "InvalidArgument") end endpoint = self.class.make_endpoint(endpoint:, region:) @channel = Common::Net.new_channel(endpoint:) interceptors = [Common::ErrorInterceptor.new] # Make sure to use the correct endpoint for the default token provider unless no_auth @token_provider = token_provider || Common::Auth::AuthTokenProvider.new @token_provider.start(endpoint: endpoint) interceptors << Common::Auth::Interceptor.new(token_provider: @token_provider) end @stub = Stately::Db::DatabaseService::Stub.new(nil, nil, channel_override: @channel, interceptors:) @store_id = store_id.to_i @schema = schema @allow_stale = false end |
Class Method Details
.make_endpoint(endpoint: nil, region: nil) ⇒ String
Construct the API endpoint from the region and endpoint. If the endpoint is provided, it will be returned as-is. If the region is provided and the endpoint is not, then the region-specific endpoint will be returned. If neither the region nor the endpoint is provided, then the default endpoint will be returned.
551 552 553 554 555 556 557 558 |
# File 'lib/statelydb.rb', line 551 def self.make_endpoint(endpoint: nil, region: nil) return endpoint unless endpoint.nil? return "https://api.stately.cloud" if region.nil? region = region.sub("aws-", "") if region.start_with?("aws-") "https://#{region}.aws.api.stately.cloud" end |
Instance Method Details
#begin_list(prefix, limit: 100, sort_direction: :ascending, item_types: [], cel_filters: [], gt: nil, gte: nil, lt: nil, lte: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
Returns the list of Items and the token.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/statelydb.rb', line 181 def begin_list(prefix, limit: 100, sort_direction: :ascending, item_types: [], cel_filters: [], gt: nil, gte: nil, lt: nil, lte: nil) sort_direction = sort_direction == :ascending ? 0 : 1 key_condition_params = [ [Stately::Db::Operator::OPERATOR_GREATER_THAN, gt.is_a?(String) ? gt : gt&.to_s], [Stately::Db::Operator::OPERATOR_GREATER_THAN_OR_EQUAL, gte.is_a?(String) ? gte : gte&.to_s], [Stately::Db::Operator::OPERATOR_LESS_THAN, lt.is_a?(String) ? lt : lt&.to_s], [Stately::Db::Operator::OPERATOR_LESS_THAN_OR_EQUAL, lte.is_a?(String) ? lte : lte&.to_s] ] key_conditions = key_condition_params .reject { |(_, value)| value.nil? } .map { |(operator, key_path)| Stately::Db::KeyCondition.new(operator: operator, key_path: key_path) } req = Stately::Db::BeginListRequest.new( store_id: @store_id, key_path_prefix: String(prefix), limit:, sort_direction:, filter_conditions: build_filters(item_types: item_types, cel_filters: cel_filters), key_conditions: key_conditions, allow_stale: @allow_stale, schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID ) resp = @stub.begin_list(req) process_list_response(resp) end |
#begin_scan(limit: 0, item_types: [], cel_filters: [], total_segments: nil, segment_index: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
begin_scan initiates a scan request which will scan over the entire store and apply the provided filters. This API returns a token that you can pass to continue_scan to paginate through the result set.
begin_scan can return items of many types, and you can check the class of each result to handle different item types.
WARNING: THIS API CAN BE EXPENSIVE FOR STORES WITH A LARGE NUMBER OF ITEMS.
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/statelydb.rb', line 287 def begin_scan(limit: 0, item_types: [], cel_filters: [], total_segments: nil, segment_index: nil) if total_segments.nil? != segment_index.nil? raise StatelyDB::Error.new("total_segments and segment_index must both be set or both be nil", code: GRPC::Core::StatusCodes::INVALID_ARGUMENT, stately_code: "InvalidArgument") end req = Stately::Db::BeginScanRequest.new( store_id: @store_id, limit:, filter_conditions: build_filters(item_types: item_types, cel_filters: cel_filters), schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID ) resp = @stub.begin_scan(req) process_list_response(resp) end |
#close ⇒ void
This method returns an undefined value.
Returns nil.
70 71 72 73 |
# File 'lib/statelydb.rb', line 70 def close @channel&.close @token_provider&.close end |
#continue_list(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
continue_list takes the token from a begin_list call and returns the next “page” of results based on the original query parameters and pagination options. It doesn’t have options because it is a continuation of a previous list operation. It will return a new token which can be used for another continue_list call, and so on. The token is the same one used by sync_list - each time you call either continue_list or sync_list, you should pass the latest version of the token, and the result will include a new version of the token to use in subsequent calls. You may interleave continue_list and sync_list calls however you like, but it does not make sense to make both calls in parallel. Calls to continue_list are tied to the authorization of the original begin_list call, so if the original begin_list call was allowed, continue_list with its token should also be allowed.
You can list items of different types in a single continue_list, and you can check the class of each result to handle different item types.
239 240 241 242 243 244 245 246 247 |
# File 'lib/statelydb.rb', line 239 def continue_list(token) req = Stately::Db::ContinueListRequest.new( token_data: token.token_data, schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID ) resp = @stub.continue_list(req) process_list_response(resp) end |
#continue_scan(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token
continue_scan takes the token from a begin_scan call and returns the next “page” of results based on the original query parameters and pagination options.
You can scan items of different types in a single continue_scan, and you can check the class of each result to handle different item types.
322 323 324 325 326 327 328 329 330 |
# File 'lib/statelydb.rb', line 322 def continue_scan(token) req = Stately::Db::ContinueScanRequest.new( token_data: token.token_data, schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID ) resp = @stub.continue_scan(req) process_list_response(resp) end |
#delete(*key_paths) ⇒ void
This method returns an undefined value.
delete removes one or more items from the Store by their full key paths. delete succeeds even if there isn’t an item at that key path. Tombstones will be saved for deleted items for some time, so that sync_list can return information about deleted items. Deletes are always applied atomically; all will fail or all will succeed.
The full key paths of the items. @raise [StatelyDB::Error] if the parameters are invalid @raise [StatelyDB::Error] if the item is not found
478 479 480 481 482 483 484 485 486 487 488 |
# File 'lib/statelydb.rb', line 478 def delete(*key_paths) key_paths = Array(key_paths).flatten req = Stately::Db::DeleteRequest.new( store_id: @store_id, schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID, deletes: key_paths.map { |key_path| Stately::Db::DeleteItem.new(key_path: String(key_path)) } ) @stub.delete(req) nil end |
#get(key_path) ⇒ StatelyDB::Item, NilClass
get retrieves an item by its full key path.
97 98 99 100 101 102 |
# File 'lib/statelydb.rb', line 97 def get(key_path) resp = get_batch(key_path) # Always return a single Item. resp.first end |
#get_batch(*key_paths) ⇒ Array<StatelyDB::Item>
get_batch retrieves multiple items by their full key paths. This will return the corresponding items that exist. Use begin_list instead if you want to retrieve multiple items but don’t already know the full key paths of the items you want to get. You can get items of different types in a single get_batch - you will need to check the class of each result to determine what item type it is.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/statelydb.rb', line 117 def get_batch(*key_paths) key_paths = Array(key_paths).flatten req = Stately::Db::GetRequest.new( store_id: @store_id, schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID, gets: key_paths.map { |key_path| Stately::Db::GetItem.new(key_path: String(key_path)) }, allow_stale: @allow_stale ) resp = @stub.get(req) resp.items.map do |result| @schema.unmarshal_item(stately_item: result) end end |
#put(item, must_not_exist: false, overwrite_metadata_timestamps: false) ⇒ StatelyDB::Item
put adds an Item to the Store, or replaces the Item if it already exists at that path.
This call will fail if:
- The Item conflicts with an existing Item at the same path and the
must_not_exist option is set, or the item's ID will be chosen with
an `initialValue` and one of its other key paths conflicts with an
existing item.
409 410 411 412 413 414 415 416 |
# File 'lib/statelydb.rb', line 409 def put(item, must_not_exist: false, overwrite_metadata_timestamps: false) resp = put_batch({ item:, must_not_exist:, overwrite_metadata_timestamps: }) # Always return a single Item. resp.first end |
#put_batch(*items) ⇒ Array<StatelyDB::Item>
put_batch adds multiple Items to the Store, or replaces Items if they already exist at that path. You can put items of different types in a single put_batch. All puts in the request are applied atomically - there are no partial successes.
This will fail if:
- Any Item conflicts with an existing Item at the same path and its
must_not_exist option is set, or the item's ID will be chosen with an
`initialValue` and one of its other key paths conflicts with an existing
item.
437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/statelydb.rb', line 437 def put_batch(*items) puts = Array(items).flatten.map do |input| if input.is_a?(Hash) item = input[:item] Stately::Db::PutItem.new( item: item.send("marshal_stately"), overwrite_metadata_timestamps: input[:overwrite_metadata_timestamps], must_not_exist: input[:must_not_exist] ) else Stately::Db::PutItem.new( item: input.send("marshal_stately") ) end end req = Stately::Db::PutRequest.new( store_id: @store_id, schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID, puts: ) resp = @stub.put(req) resp.items.map do |result| @schema.unmarshal_item(stately_item: result) end end |
#sync_list(token) ⇒ StatelyDB::SyncResult
sync_list returns all changes to Items within the result set of a previous List operation. For all Items within the result set that were modified, it returns the full Item at in its current state. If the result set has already been expanded to the end (in the direction of the original begin_list request), sync_list will return newly created Items as well. It also returns a list of Item key paths that were deleted since the last sync_list, which you should reconcile with your view of items returned from previous begin_list/continue_list calls. Using this API, you can start with an initial set of items from begin_list, and then stay up to date on any changes via repeated sync_list requests over time.
The token is the same one used by continue_list - each time you call either continue_list or sync_list, you should pass the latest version of the token, and then use the new token from the result in subsequent calls. You may interleave continue_list and sync_list calls however you like, but it does not make sense to make both calls in parallel. Calls to sync_list are tied to the authorization of the original begin_list call, so if the original begin_list call was allowed, sync_list with its token should also be allowed.
The result will contain:
- changed_items: Items that were changed or added since the last
sync_list call.
- deleted_item_paths: The key paths of items that were deleted since
the last sync_list call.
- updated_outside_list_window_paths: Item that were updated but
are not within the current result set. You can treat this like
deleted_item_paths, but the item hasn't actually been deleted, it's
just not part of your view of the list anymore.
- is_reset: A reset signal that indicates any previously cached
view of the result set is no longer valid. You should throw away
any locally cached data. Use the changed_items list from this result
as the new view of the result set.
372 373 374 375 376 377 378 379 380 |
# File 'lib/statelydb.rb', line 372 def sync_list(token) req = Stately::Db::SyncListRequest.new( token_data: token.token_data, schema_id: @schema::SCHEMA_ID, schema_version_id: @schema::SCHEMA_VERSION_ID ) resp = @stub.sync_list(req) process_sync_response(resp) end |
#transaction ⇒ StatelyDB::Transaction::Transaction::Result
transaction allows you to issue reads and writes in any order, and all writes will either succeed or all will fail when the transaction finishes. It takes a block with a single argument that can be used to run commands within the transaction.
Reads are guaranteed to reflect the state as of when the transaction started. A transaction may fail if another transaction commits before this one finishes - in that case, you should retry your transaction.
If any error is thrown from the block, the transaction is aborted and none of the changes made in it will be applied. If the handler returns without error, the transaction is automatically committed.
If any of the operations in the block fails (e.g. a request is invalid) you may not find out until the next operation, or once the block finishes, due to some technicalities about how requests are handled.
When the transaction is committed, the result property will contain the full version of any items that were put in the transaction, and the committed property will be True. If the transaction was aborted, the committed property will be False.
522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 |
# File 'lib/statelydb.rb', line 522 def transaction txn = StatelyDB::Transaction::Transaction.new(stub: @stub, store_id: @store_id, schema: @schema) txn.begin yield txn txn.commit rescue StatelyDB::Error raise # Handle any other exceptions and abort the transaction. We're rescuing Exception here # because we want to catch all exceptions, including those that don't inherit from StandardError. rescue Exception => e txn.abort # All gRPC errors inherit from GRPC::BadStatus. We wrap these in a StatelyDB::Error. raise StatelyDB::Error.from(e) if e.is_a? GRPC::BadStatus # Calling raise with no parameters re-raises the original exception raise end |
#with_allow_stale(allow_stale) ⇒ self
Returns a new client that is either OK with or not OK with stale reads. This affects get and list operations from the returned client. Use this only if you know you can tolerate stale reads. This can result in improved performance, availability, and cost.
83 84 85 86 87 |
# File 'lib/statelydb.rb', line 83 def with_allow_stale(allow_stale) new_client = clone new_client.instance_variable_set(:@allow_stale, allow_stale) new_client end |