Class: StatelyDB::CoreClient

Inherits:
Object
  • Object
show all
Defined in:
lib/statelydb.rb

Overview

CoreClient is a low level client for interacting with the Stately Cloud API. This client shouldn’t be used directly in most cases. Instead, use the generated client for your schema.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store_id:, schema:, token_provider: nil, endpoint: nil, region: nil, no_auth: false) ⇒ CoreClient

Initialize a new StatelyDB CoreClient



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/statelydb.rb', line 33

def initialize(store_id:,
               schema:,
               token_provider: nil,
               endpoint: nil,
               region: nil,
               no_auth: false)
  if store_id.nil?
    raise StatelyDB::Error.new("store_id is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end
  if schema.nil?
    raise StatelyDB::Error.new("schema is required",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end

  endpoint = self.class.make_endpoint(endpoint:, region:)
  @channel = Common::Net.new_channel(endpoint:)
  # Make sure to use the correct endpoint for the default token provider
  @token_provider = token_provider || Common::Auth::AuthTokenProvider.new(endpoint:)

  interceptors = [Common::ErrorInterceptor.new]
  interceptors << Common::Auth::Interceptor.new(token_provider: @token_provider) unless no_auth

  @stub = Stately::Db::DatabaseService::Stub.new(nil, nil,
                                                 channel_override: @channel, interceptors:)
  @store_id = store_id.to_i
  @schema = schema
  @allow_stale = false
end

Class Method Details

.make_endpoint(endpoint: nil, region: nil) ⇒ String

Construct the API endpoint from the region and endpoint. If the endpoint is provided, it will be returned as-is. If the region is provided and the endpoint is not, then the region-specific endpoint will be returned. If neither the region nor the endpoint is provided, then the default endpoint will be returned.



374
375
376
377
378
379
380
381
# File 'lib/statelydb.rb', line 374

def self.make_endpoint(endpoint: nil, region: nil)
  return endpoint unless endpoint.nil?
  return "https://api.stately.cloud" if region.nil?

  region = region.sub("aws-", "") if region.start_with?("aws-")

  "https://#{region}.aws.api.stately.cloud"
end

Instance Method Details

#begin_list(prefix, limit: 100, sort_property: nil, sort_direction: :ascending) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

Begin listing Items from a StatelyDB Store at the given prefix.

Examples:

client.data.begin_list("/ItemType-identifier", limit: 10, sort_direction: :ascending)


132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/statelydb.rb', line 132

def begin_list(prefix,
               limit: 100,
               sort_property: nil,
               sort_direction: :ascending)
  sort_direction = sort_direction == :ascending ? 0 : 1

  req = Stately::Db::BeginListRequest.new(
    store_id: @store_id,
    key_path_prefix: String(prefix),
    limit:,
    sort_property:,
    sort_direction:,
    allow_stale: @allow_stale,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_list(req)
  process_list_response(resp)
end

#begin_scan(limit: 100, item_types: [], total_segments: nil, segment_index: nil) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

Initiates a scan request which will scan over the entire store and apply the provided filters. This API returns a token that you can pass to continue_scan to paginate through the result set. This can fail if the caller does not have permission to read Items.

WARNING: THIS API CAN BE EXTREMELY EXPENSIVE FOR STORES WITH A LARGE NUMBER OF ITEMS.

Examples:

client.data.begin_scan(limit: 10, item_types: [MyItem])


187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/statelydb.rb', line 187

def begin_scan(limit: 100,
               item_types: [],
               total_segments: nil,
               segment_index: nil)
  if total_segments.nil? != segment_index.nil?
    raise StatelyDB::Error.new("total_segments and segment_index must both be set or both be nil",
                               code: GRPC::Core::StatusCodes::INVALID_ARGUMENT,
                               stately_code: "InvalidArgument")
  end
  req = Stately::Db::BeginScanRequest.new(
    store_id: @store_id,
    limit:,
    filter_condition: item_types.map do |item_type|
      Stately::Db::FilterCondition.new(item_type: item_type.respond_to?(:name) ? item_type.name.split("::").last : item_type)
    end,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.begin_scan(req)
  process_list_response(resp)
end

#closevoid



66
67
68
69
# File 'lib/statelydb.rb', line 66

def close
  @channel&.close
  @token_provider&.close
end

#continue_list(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

Continue listing Items from a StatelyDB Store using a token.

Examples:

(items, token) = client.data.begin_list("/ItemType-identifier")
client.data.continue_list(token)


159
160
161
162
163
164
165
166
# File 'lib/statelydb.rb', line 159

def continue_list(token)
  req = Stately::Db::ContinueListRequest.new(
    token_data: token.token_data,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_list(req)
  process_list_response(resp)
end

#continue_scan(token) ⇒ Array<StatelyDB::Item>, StatelyDB::Token

continue_scan takes the token from a begin_scan call and returns more results based on the original request parameters and pagination options.

WARNING: THIS API CAN BE EXTREMELY EXPENSIVE FOR STORES WITH A LARGE NUMBER OF ITEMS.

Examples:

(items, token) = client.data.begin_scan(limit: 10, item_types: [MyItem])
client.data.continue_scan(token)


219
220
221
222
223
224
225
226
# File 'lib/statelydb.rb', line 219

def continue_scan(token)
  req = Stately::Db::ContinueScanRequest.new(
    token_data: token.token_data,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.continue_scan(req)
  process_list_response(resp)
end

#delete(*key_paths) ⇒ void

This method returns an undefined value.

Delete up to 50 Items from a StatelyDB Store at the given key_paths.

Examples:

client.data.delete("/ItemType-identifier", "/ItemType-identifier2")

Raises:



320
321
322
323
324
325
326
327
328
329
# File 'lib/statelydb.rb', line 320

def delete(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::DeleteRequest.new(
    store_id: @store_id,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    deletes: key_paths.map { |key_path| Stately::Db::DeleteItem.new(key_path: String(key_path)) }
  )
  @stub.delete(req)
  nil
end

#get(key_path) ⇒ StatelyDB::Item, NilClass

Fetch a single Item from a StatelyDB Store at the given key_path.

Examples:

client.get("/ItemType-identifier")

Raises:

  • (StatelyDB::Error)

    if the parameters are invalid or if the item is not found



91
92
93
94
95
96
# File 'lib/statelydb.rb', line 91

def get(key_path)
  resp = get_batch(key_path)

  # Always return a single Item.
  resp.first
end

#get_batch(*key_paths) ⇒ Array<StatelyDB::Item>, NilClass

Fetch a batch of up to 100 Items from a StatelyDB Store at the given key_paths.

Examples:

client.data.get_batch("/ItemType-identifier", "/ItemType-identifier2")

Raises:

  • (StatelyDB::Error)

    if the parameters are invalid or if the item is not found



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/statelydb.rb', line 106

def get_batch(*key_paths)
  key_paths = Array(key_paths).flatten
  req = Stately::Db::GetRequest.new(
    store_id: @store_id,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    gets:
      key_paths.map { |key_path| Stately::Db::GetItem.new(key_path: String(key_path)) },
    allow_stale: @allow_stale
  )

  resp = @stub.get(req)
  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

#put(item, must_not_exist: false, overwrite_metadata_timestamps: false) ⇒ StatelyDB::Item

Put an Item into a StatelyDB Store at the given key_path.

Examples:

client.data.put(my_item)


client.data.put(my_item, must_not_exist: true)




265
266
267
268
269
270
271
272
# File 'lib/statelydb.rb', line 265

def put(item,
        must_not_exist: false,
        overwrite_metadata_timestamps: false)
  resp = put_batch({ item:, must_not_exist:, overwrite_metadata_timestamps: })

  # Always return a single Item.
  resp.first
end

#put_batch(*items) ⇒ Array<StatelyDB::Item>

Put a batch of up to 50 Items into a StatelyDB Store.

Max 50 items.

Examples:

client.data.put_batch(item1, item2)
client.data.put_batch({ item: item1, must_not_exist: true }, item2)


284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/statelydb.rb', line 284

def put_batch(*items)
  puts = Array(items).flatten.map do |input|
    if input.is_a?(Hash)
      item = input[:item]
      Stately::Db::PutItem.new(
        item: item.send("marshal_stately"),
        overwrite_metadata_timestamps: input[:overwrite_metadata_timestamps],
        must_not_exist: input[:must_not_exist]
      )
    else
      Stately::Db::PutItem.new(
        item: input.send("marshal_stately")
      )
    end
  end
  req = Stately::Db::PutRequest.new(
    store_id: @store_id,
    schema_version_id: @schema::SCHEMA_VERSION_ID,
    puts:
  )
  resp = @stub.put(req)

  resp.items.map do |result|
    @schema.unmarshal_item(stately_item: result)
  end
end

#sync_list(token) ⇒ StatelyDB::SyncResult

Sync a list of Items from a StatelyDB Store.

Examples:

(items, token) = client.data.begin_list("/ItemType-identifier")
client.data.sync_list(token)


236
237
238
239
240
241
242
243
# File 'lib/statelydb.rb', line 236

def sync_list(token)
  req = Stately::Db::SyncListRequest.new(
    token_data: token.token_data,
    schema_version_id: @schema::SCHEMA_VERSION_ID
  )
  resp = @stub.sync_list(req)
  process_sync_response(resp)
end

#transactionStatelyDB::Transaction::Transaction::Result

Transaction takes a block and executes the block within a transaction. If the block raises an exception, the transaction is rolled back. If the block completes successfully, the transaction is committed.

Examples:

client.data.transaction do |txn|
  txn.put(item: my_item)
  txn.put(item: another_item)
end

Raises:



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/statelydb.rb', line 345

def transaction
  txn = StatelyDB::Transaction::Transaction.new(stub: @stub, store_id: @store_id, schema: @schema)
  txn.begin
  yield txn
  txn.commit
rescue StatelyDB::Error
  raise
# Handle any other exceptions and abort the transaction. We're rescuing Exception here
# because we want to catch all exceptions, including those that don't inherit from StandardError.
rescue Exception => e
  txn.abort

  # All gRPC errors inherit from GRPC::BadStatus. We wrap these in a StatelyDB::Error.
  raise StatelyDB::Error.from(e) if e.is_a? GRPC::BadStatus

  # Calling raise with no parameters re-raises the original exception
  raise
end

#with_allow_stale(allow_stale) ⇒ self

Set whether to allow stale results for all operations with this client. This produces a new client with the allow_stale flag set.

Examples:

client.with_allow_stale(true).get("/ItemType-identifier")


77
78
79
80
81
# File 'lib/statelydb.rb', line 77

def with_allow_stale(allow_stale)
  new_client = clone
  new_client.instance_variable_set(:@allow_stale, allow_stale)
  new_client
end