Class: Google::Cloud::Firestore::BulkWriter

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/firestore/bulk_writer.rb

Overview

BulkWriter

Accumulate and efficiently sends large amounts of document write operations to the server.

BulkWriter can handle large data migrations or updates, buffering records in memory and submitting them to the server in batches of 20.

The submission of batches is internally parallelized with a ThreadPoolExecutor.

Examples:

Create a BulkWriter and add a write request:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

bw.create("cities/NYC", { name: "New York City" })

bw.flush
bw.close

Constant Summary collapse

MAX_RETRY_ATTEMPTS =
10

Instance Method Summary collapse

Constructor Details

#initialize(client, service, request_threads: nil, batch_threads: nil, retries: nil) ⇒ BulkWriter

Initialize the attributes and start the schedule_operations job



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/google/cloud/firestore/bulk_writer.rb', line 56

def initialize client, service,
               request_threads: nil,
               batch_threads: nil,
               retries: nil
  @client = client
  @service = service
  @closed = false
  @flush = false
  @request_threads = (request_threads || 2).to_i
  @write_thread_pool = Concurrent::ThreadPoolExecutor.new max_threads: @request_threads,
                                                          max_queue: 0
  @mutex = Mutex.new
  @scheduler = BulkWriterScheduler.new client, service, batch_threads
  @doc_refs = Set.new
  @retries = [retries || MAX_RETRY_ATTEMPTS, MAX_RETRY_ATTEMPTS].min
  @request_results = []
end

Instance Method Details

#closenil

Closes the BulkWriter object for new operations. Existing operations will be flushed and the threadpool will shutdown.

Returns:

  • (nil)


477
478
479
480
481
482
483
484
# File 'lib/google/cloud/firestore/bulk_writer.rb', line 477

def close
  @mutex.synchronize { @closed = true }
  flush
  @mutex.synchronize do
    @write_thread_pool.shutdown
    @scheduler.close
  end
end

#create(doc, data) ⇒ Google::Cloud::Firestore::Promise::Future

Creates a document with the provided data (fields and values).

The operation will fail if the document already exists.

Examples:

Create a document using a document path:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

bw.create("cities/NYC", { name: "New York City" })

Create a document using a document reference:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

bw.create(nyc_ref, { name: "New York City" })

Create a document and set a field to server_time:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

bw.create(nyc_ref, { name: "New York City",
                       updated_at: firestore.field_server_time })

Get the value of write operation:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

result = bw.create(nyc_ref, { name: "New York City",
                              updated_at: firestore.field_server_time })

bw.close

puts result.value

Parameters:

  • doc (String, DocumentReference)

    A string representing the path of the document, or a document reference object.

  • data (Hash)

    The document's fields and values.

Returns:



133
134
135
136
137
138
139
140
# File 'lib/google/cloud/firestore/bulk_writer.rb', line 133

def create doc, data
  doc_path = coalesce_doc_path_argument doc
  pre_add_operation doc_path

  write = Convert.write_for_create doc_path, data

  create_and_enqueue_operation write
end

#delete(doc, exists: nil, update_time: nil) ⇒ Google::Cloud::Firestore::Promise::Future

Deletes a document from the database.

Examples:

Delete a document using a document path:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Delete a document
bw.delete "cities/NYC"

Delete a document using a document reference:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

# Delete a document
bw.delete nyc_ref

Delete a document using exists:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Delete a document
bw.delete "cities/NYC", exists: true

Delete a document using the update_time precondition:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

last_updated_at = Time.now - 42 # 42 seconds ago

# Delete a document
bw.delete "cities/NYC", update_time: last_updated_at

Get the value of write operation:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

last_updated_at = Time.now - 42 # 42 seconds ago

# Delete a document
result = bw.delete "cities/NYC", update_time: last_updated_at

bw.close

puts result.value

Parameters:

  • doc (String, DocumentReference)

    A string representing the path of the document, or a document reference object.

  • exists (Boolean) (defaults to: nil)

    Whether the document must exist. When true, the document must exist or an error is raised. Default is false. Optional.

  • update_time (Time) (defaults to: nil)

    When set, the document must have been last updated at that time. Optional.

Returns:



444
445
446
447
448
449
450
451
# File 'lib/google/cloud/firestore/bulk_writer.rb', line 444

def delete doc, exists: nil, update_time: nil
  doc_path = coalesce_doc_path_argument doc
  pre_add_operation doc_path

  write = Convert.write_for_delete doc_path, exists: exists, update_time: update_time

  create_and_enqueue_operation write
end

#flushnil

Flushes all the current operation before enqueuing new operations.

Returns:

  • (nil)


457
458
459
460
461
462
463
464
465
466
467
468
469
470
# File 'lib/google/cloud/firestore/bulk_writer.rb', line 457

def flush
  @mutex.synchronize { @flush = true }
  @request_results.each do |result|
    begin
      result.wait!
    rescue StandardError
      # Ignored
    end
  end
  @mutex.synchronize do
    @doc_refs = Set.new
    @flush = false
  end
end

#set(doc, data, merge: nil) ⇒ Google::Cloud::Firestore::Promise::Future

Writes the provided data (fields and values) to the provided document. If the document does not exist, it will be created. By default, the provided data overwrites existing data, but the provided data can be merged into the existing document using the merge argument.

If you're not sure whether the document exists, use the merge argument to merge the new data with any existing document data to avoid overwriting entire documents.

Examples:

Set a document using a document path:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Update a document
bw.set("cities/NYC", { name: "New York City" })

Create a document using a document reference:

require "google/cloud/firestore"

bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

# Update a document
bw.set(nyc_ref, { name: "New York City" })

Set a document and merge all data:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

bw.set("cities/NYC", { name: "New York City" }, merge: true)

Set a document and merge only name:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

bw.set("cities/NYC", { name: "New York City" }, merge: :name)

Set a document and deleting a field using merge:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

nyc_data = { name: "New York City",
             trash: firestore.field_delete }

bw.set(nyc_ref, nyc_data, merge: true)

Set a document and set a field to server_time:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

nyc_data = { name: "New York City",
             updated_at: firestore.field_server_time }

bw.set(nyc_ref, nyc_data, merge: true)

Get the value of write operation:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

nyc_data = { name: "New York City",
             updated_at: firestore.field_server_time }

result = bw.set(nyc_ref, nyc_data)

bw.close

puts result.value

Parameters:

  • doc (String, DocumentReference)

    A string representing the path of the document, or a document reference object.

  • data (Hash)

    The document's fields and values.

  • merge (Boolean, FieldPath, String, Symbol) (defaults to: nil)

    When true, all provided data is merged with the existing document data. When the argument is one or more field path, only the data for fields in this argument is merged with the existing document data. The default is to not merge, but to instead overwrite the existing document data.

Returns:



247
248
249
250
251
252
253
254
# File 'lib/google/cloud/firestore/bulk_writer.rb', line 247

def set doc, data, merge: nil
  doc_path = coalesce_doc_path_argument doc
  pre_add_operation doc_path

  write = Convert.write_for_set doc_path, data, merge: merge

  create_and_enqueue_operation write
end

#update(doc, data, update_time: nil) ⇒ Google::Cloud::Firestore::Promise::Future

Updates the document with the provided data (fields and values). The provided data is merged into the existing document data.

The operation will fail if the document does not exist.

Examples:

Update a document using a document path:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

bw.update("cities/NYC", { name: "New York City" })

Directly update a deeply-nested field with a FieldPath:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

nested_field_path = firestore.field_path :favorites, :food

bw.update("users/frank", { nested_field_path => "Pasta" })

Update a document using a document reference:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

bw.update(nyc_ref, { name: "New York City" })

Update a document using the update_time precondition:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

last_updated_at = Time.now - 42 # 42 seconds ago

bw.update("cities/NYC", { name: "New York City" },
           update_time: last_updated_at)

Update a document and deleting a field:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

nyc_data = { name: "New York City",
             trash: firestore.field_delete }

bw.update(nyc_ref, nyc_data)

Update a document and set a field to server_time:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

nyc_data = { name: "New York City",
             updated_at: firestore.field_server_time }

bw.update(nyc_ref, nyc_data)

Get the value of write operation:

require "google/cloud/firestore"

firestore = Google::Cloud::Firestore.new
bw = firestore.bulk_writer

# Get a document reference
nyc_ref = firestore.doc "cities/NYC"

nyc_data = { name: "New York City",
             updated_at: firestore.field_server_time }

result = bw.update(nyc_ref, nyc_data)

bw.close

puts result.value

Parameters:

  • doc (String, DocumentReference)

    A string representing the path of the document, or a document reference object.

  • data (Hash<FieldPath|String|Symbol, Object>)

    The document's fields and values.

    The top-level keys in the data hash are considered field paths, and can either be a FieldPath object, or a string representing the nested fields. In other words the string represents individual fields joined by ".". Fields containing ~, *, /, [, ], and . cannot be in a dotted string, and should provided using a FieldPath object instead.

  • update_time (Time) (defaults to: nil)

    When set, the document must have been last updated at that time. Optional.

Returns:



365
366
367
368
369
370
371
372
# File 'lib/google/cloud/firestore/bulk_writer.rb', line 365

def update doc, data, update_time: nil
  doc_path = coalesce_doc_path_argument doc
  pre_add_operation doc_path

  write = Convert.write_for_update doc_path, data, update_time: update_time

  create_and_enqueue_operation write
end