Module: OceanDynamo::Queries

Included in:
Table
Defined in:
lib/ocean-dynamo/queries.rb

Instance Method Summary collapse

Instance Method Details

#all(consistent: false, **options) ⇒ Object

Returns all records in the table.



47
48
49
50
51
52
53
54
# File 'lib/ocean-dynamo/queries.rb', line 47

def all(consistent: false, **options)
  _late_connect?
  records = []
  in_batches :scan, { consistent_read: !!consistent } do |attrs|
    records << new._setup_from_dynamo(attrs)
  end
  records
end

#condition_builder(hash_key, hash_value, range_key = nil, comparator = nil, range_value = nil, limit: nil, consistent: false, scan_index_forward: true, select: nil) ⇒ Object

This helper constructs the options hash for a subsequent call to in_batches and friends. It takes care of creating expression attributes names and values for all data and takes parameters to control scan direction and limits, etc.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/ocean-dynamo/queries.rb', line 121

def condition_builder(hash_key, hash_value,
                      range_key=nil, comparator=nil, range_value=nil,
                      limit: nil, consistent: false, scan_index_forward: true,
                      select: nil)
  if range_key
    options = { 
      expression_attribute_names: { "#H" => hash_key, "#R" => range_key },
      key_condition_expression: "#H = :hashval AND #R #{comparator} :rangeval",
      expression_attribute_values: { ":hashval" => hash_value, ":rangeval" => range_value }
    }
  else
    options = { 
      expression_attribute_names: { "#H" => hash_key },
      key_condition_expression: "#H = :hashval",
      expression_attribute_values: { ":hashval" => hash_value }
    }
  end
  options[:limit] = limit if limit
  options[:consistent_read] = consistent if consistent
  options[:scan_index_forward] = scan_index_forward if !scan_index_forward
  options[:select] = select.to_s.upcase if select
  options
end

#count(**options) ⇒ Object

The number of records in the table. Updated every 6 hours or so; thus isn’t a reliable real-time measure of the number of table items.



38
39
40
41
# File 'lib/ocean-dynamo/queries.rb', line 38

def count(**options)
  _late_connect?
  dynamo_table.item_count
end

#find(hash, range = nil, consistent: false) ⇒ Object


Class methods



10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/ocean-dynamo/queries.rb', line 10

def find(hash, range=nil, consistent: false)
  return hash.collect {|elem| find elem, range, consistent: consistent } if hash.is_a?(Array)
  _late_connect?
  hash = hash.id if hash.kind_of?(Table)    # TODO: We have (innocuous) leakage, fix!
  range = range.to_i if range.is_a?(Time)
  keys = { table_hash_key.to_s => hash }
  keys[table_range_key] = range if table_range_key && range
  options = { key: keys, consistent_read: consistent }
  item = dynamo_table.get_item(options).item
  unless item
    raise RecordNotFound, "can't find a #{self} with primary key ['#{hash}', #{range.inspect}]" 
  end
  new._setup_from_dynamo(item)
end

#find_by_key(*args) ⇒ Object Also known as: find_by_id



26
27
28
29
30
# File 'lib/ocean-dynamo/queries.rb', line 26

def find_by_key(*args)
  find(*args)
rescue RecordNotFound
  nil
end

#find_each(consistent: false, limit: nil, batch_size: nil) ⇒ Object

Looping through a collection of records from the database (using the all method, for example) is very inefficient since it will try to instantiate all the objects at once. Batch processing methods allow you to work with the records in batches, thereby greatly reducing memory consumption.

TODO: Add support for

index_name: "IndexName",
select: "ALL_ATTRIBUTES", # ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, SPECIFIC_ATTRIBUTES, COUNT


89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/ocean-dynamo/queries.rb', line 89

def find_each(consistent: false, limit: nil, batch_size: nil)
  options = { consistent_read: consistent }
  batch_size = limit if limit && batch_size && limit < batch_size
  options[:limit] = batch_size if batch_size   
  in_batches :scan, options do |attrs|
    if limit
      return true if limit <= 0
      limit = limit - 1
    end
    yield new._setup_from_dynamo(attrs)
  end
end

#find_global(*args) ⇒ Object

This method takes the same args as find_global_each but returns all found items as an array.



197
198
199
200
201
202
203
# File 'lib/ocean-dynamo/queries.rb', line 197

def find_global(*args)
  result = []
  find_global_each(*args) do |item|
    result << item
  end
  result
end

#find_global_each(hash_key, hash_value, range_key = nil, comparator = nil, range_value = nil, limit: nil, scan_index_forward: true, &block) ⇒ Object

This method finds each item of a global secondary index, sequentially yielding each item to the given block (required). The parameters are as follows:

hash_key The name of the hash key to use (required). hash_value The value of the hash key to match (required). range_key The name of the range key to use (optional). comparator The comparator to use. “=”, “<”, “>”, “<=”, “>=”. (optional). range-value The value of the range key to match (optional).

Note that range_key is optional, but if it’s present, then the comparator and the range_value must also be given. They must either all be present or absent.

The following keyword arguments are accepted:

:limit The maximum number of items to read. :scan_index_forward If false, items will be in reverse order.

If the index contains all attributes, no extra read will be performed. If it doesn’t, the entire item will be fetched using an extra read operation.



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/ocean-dynamo/queries.rb', line 167

def find_global_each(hash_key, hash_value,
                     range_key=nil, comparator=nil, range_value=nil,
                     limit: nil, scan_index_forward: true,
                     &block)
  hash_value = hash_value.to_i if hash_value.is_a?(Time)
  range_value = range_value.to_i if range_value.is_a?(Time)
  options = condition_builder(hash_key, hash_value, range_key, comparator, range_value,
                              limit: limit, scan_index_forward: scan_index_forward)
  index_name = (range_key ? "#{hash_key}_#{range_key}" : hash_key.to_s) + "_global"
  options[:index_name] = index_name
  raise "Undefined global index: #{index_name}" unless global_secondary_indexes[index_name]
  all_projected = global_secondary_indexes[index_name]["projection_type"] == "ALL"
  in_batches :query, options do |attrs|
    if limit
      return if limit <= 0
      limit = limit - 1
    end
    if all_projected
      yield new._setup_from_dynamo(attrs)
    else
      yield find(attrs[table_hash_key.to_s], table_range_key && attrs[table_range_key.to_s])
    end
  end
end

#find_local(*args) ⇒ Object

This method takes the same args as find_local_each but returns all found items as an array.



252
253
254
255
256
257
258
# File 'lib/ocean-dynamo/queries.rb', line 252

def find_local(*args)
  result = []
  find_local_each(*args) do |item|
    result << item
  end
  result
end

#find_local_each(hash_key, hash_value, range_key, comparator, range_value, limit: nil, scan_index_forward: true, consistent: false, &block) ⇒ Object

This method finds each item of a local secondary index, sequentially yielding each item to the given block (required). The parameters are as follows:

hash_key The name of the hash key to use (required, must be the table’s hash_key). hash_value The value of the hash key to match (required). range_key The name of the range key to use (required). comparator The comparator to use. “=”, “<”, “>”, “<=”, “>=”. (required). range-value The value of the range key to match (required).

Note that range_key must all be present.

The following keyword arguments are accepted:

:limit The maximum number of items to read. :scan_index_forward If false, items will be in reverse order. :consistent If true, consistent reads will be used. Default false.



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/ocean-dynamo/queries.rb', line 224

def find_local_each(hash_key, hash_value,
                    range_key, comparator, range_value,
                    limit: nil, scan_index_forward: true, consistent: false,
                    &block)
  raise "The hash_key is #{hash_key.inspect} but must be #{table_hash_key.inspect}" unless hash_key == table_hash_key
  hash_value = hash_value.to_i if hash_value.is_a?(Time)
  range_value = range_value.to_i if range_value.is_a?(Time)
  options = condition_builder(hash_key, hash_value, range_key, comparator, range_value,
                              select: :all_attributes,
                              limit: limit, scan_index_forward: scan_index_forward,
                              consistent: consistent)
  index_name = range_key.to_s
  options[:index_name] = index_name
  raise "Undefined local index: #{index_name}" unless local_secondary_indexes.include?(index_name)
  in_batches :query, options do |attrs|
    if limit
      return if limit <= 0
      limit = limit - 1
    end
    yield new._setup_from_dynamo(attrs)
  end
end

#in_batches(message, options, &block) ⇒ Object

This method takes a block and yields it to every record in a table. message must be either :scan or :query. options is the hash of options to pass to the scan or query operation.

TODO: Add support for

index_name: "IndexName",
select: "ALL_ATTRIBUTES", # ALL_ATTRIBUTES, ALL_PROJECTED_ATTRIBUTES, SPECIFIC_ATTRIBUTES, COUNT


66
67
68
69
70
71
72
73
74
75
76
# File 'lib/ocean-dynamo/queries.rb', line 66

def in_batches(message, options, &block)
  _late_connect?
  loop do
    result = dynamo_table.send message, options
    result.items.each do |hash|
      yield hash
    end
    return true unless result.last_evaluated_key
    options[:exclusive_start_key] = result.last_evaluated_key
  end
end