Module: Dynamoid::Adapter

Extended by:
Adapter
Included in:
Adapter
Defined in:
lib/dynamoid/adapter.rb,
lib/dynamoid/adapter/aws_sdk.rb

Overview

Adapter provides a generic, write-through class that abstracts variations in the underlying connections to provide a uniform response to Dynamoid.

Defined Under Namespace

Modules: AwsSdk

Instance Attribute Summary collapse

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object

Delegate all methods that aren’t defind here to the underlying adapter.

Since:

  • 0.2.0



218
219
220
221
# File 'lib/dynamoid/adapter.rb', line 218

def method_missing(method, *args, &block)
  return benchmark(method, *args) {adapter.send(method, *args, &block)} if @adapter.respond_to?(method)
  super
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



8
9
10
# File 'lib/dynamoid/adapter.rb', line 8

def tables
  @tables
end

Instance Method Details

#adapterObject

The actual adapter currently in use: presently AwsSdk.

Since:

  • 0.2.0



13
14
15
16
# File 'lib/dynamoid/adapter.rb', line 13

def adapter
  reconnect! unless @adapter
  @adapter
end

#benchmark(method, *args) { ... } ⇒ Object

Shows how long it takes a method to run on the adapter. Useful for generating logged output.

Parameters:

  • method (Symbol)

    the name of the method to appear in the log

  • args (Array)

    the arguments to the method to appear in the log

Yields:

  • the actual code to benchmark

Returns:

  • the result of the yield

Since:

  • 0.2.0



37
38
39
40
41
42
# File 'lib/dynamoid/adapter.rb', line 37

def benchmark(method, *args)
  start = Time.now
  result = yield
  Dynamoid.logger.info "(#{((Time.now - start) * 1000.0).round(2)} ms) #{method.to_s.split('_').collect(&:upcase).join(' ')}#{ " - #{args.inspect}" unless args.nil? || args.empty? }"
  return result
end

#delete(table, ids, options = {}) ⇒ Object

Delete an item from a table. If partitioning is turned on, deletes all partitioned keys as well.

Parameters:

  • table (String)

    the name of the table to write the object to

  • ids (Array)

    to delete, can also be a string of just one id

  • range_key (Array)

    of the record to delete, can also be a string of just one range_key



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/dynamoid/adapter.rb', line 97

def delete(table, ids, options = {})
  range_key = options[:range_key] #array of range keys that matches the ids passed in
  if ids.respond_to?(:each)
    if range_key.respond_to?(:each)
      #turn ids into array of arrays each element being hash_key, range_key
      ids = ids.each_with_index.map{|id,i| [id,range_key[i]]}
    else
      ids = range_key ? [[ids, range_key]] : ids
    end
    
    if Dynamoid::Config.partitioning?
      batch_delete_item(table => id_with_partitions(ids))
    else
      batch_delete_item(table => ids)
    end
  else
    if Dynamoid::Config.partitioning?
      ids = range_key ? [[ids, range_key]] : ids
      batch_delete_item(table => id_with_partitions(ids))
    else
      delete_item(table, ids, options)
    end
  end
end

#get_original_id_and_partition(id) ⇒ String

Get original id (hash_key) and partiton number from a hash_key

Parameters:

  • id (String)

    the id or hash_key of a record, ex. xxxxx.13

Returns:

  • (String, String)

    original_id and the partition number, ex original_id = xxxxx partition = 13



166
167
168
169
170
171
# File 'lib/dynamoid/adapter.rb', line 166

def get_original_id_and_partition id
  partition = id.split('.').last
  id = id.split(".#{partition}").first

  return id, partition
end

#id_with_partitions(ids) ⇒ Object

Takes a list of ids and returns them with partitioning added. If an array of arrays is passed, we assume the second key is the range key and pass it in unchanged.

Examples:

Partition id 1

Dynamoid::Adapter.id_with_partitions(['1']) # ['1.0', '1.1', '1.2', ..., '1.199']

Partition id 1 and range_key 1.0

Dynamoid::Adapter.id_with_partitions([['1', 1.0]]) # [['1.0', 1.0], ['1.1', 1.0], ['1.2', 1.0], ..., ['1.199', 1.0]]

Parameters:

  • ids (Array)

    array of ids to partition

Since:

  • 0.2.0



157
158
159
# File 'lib/dynamoid/adapter.rb', line 157

def id_with_partitions(ids)
  Array(ids).collect {|id| (0...Dynamoid::Config.partition_size).collect{|n| id.is_a?(Array) ? ["#{id.first}.#{n}", id.last] : "#{id}.#{n}"}}.flatten(1)
end

#query(table_name, opts = {}) ⇒ Array

Query the DynamoDB table. This employs DynamoDB’s indexes so is generally faster than scanning, but is only really useful for range queries, since it can only find by one hash key at once. Only provide one range key to the hash. If paritioning is on, will run a query for every parition and join the results

Parameters:

  • table_name (String)

    the name of the table

  • opts (Hash) (defaults to: {})

    the options to query the table with

Options Hash (opts):

  • :hash_value (String)

    the value of the hash key to find

  • :range_value (Range)

    find the range key within this range

  • :range_greater_than (Number)

    find range keys greater than this

  • :range_less_than (Number)

    find range keys less than this

  • :range_gte (Number)

    find range keys greater than or equal to this

  • :range_lte (Number)

    find range keys less than or equal to this

Returns:

  • (Array)

    an array of all matching items



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/dynamoid/adapter.rb', line 238

def query(table_name, opts = {})
  
  unless Dynamoid::Config.partitioning?
    #no paritioning? just pass to the standard query method
    Dynamoid::Adapter::AwsSdk.query(table_name, opts)
  else
    #get all the hash_values that could be possible
    ids = id_with_partitions(opts[:hash_value])

    #lets not overwrite with the original options
    modified_options = opts.clone     
    results = []
    
    #loop and query on each of the partition ids
    ids.each do |id|
      modified_options[:hash_value] = id

      query_result = Dynamoid::Adapter::AwsSdk.query(table_name, modified_options)
      query_result = [query_result] if !query_result.is_a?(Array)

      results = results + query_result unless query_result.nil? 
    end 
    
    result_for_partition results, table_name
  end
end

#read(table, ids, options = {}) ⇒ Object

Read one or many keys from the selected table. This method intelligently calls batch_get or get on the underlying adapter depending on whether ids is a range or a single key: additionally, if partitioning is enabled, it batch_gets all keys in the partition space automatically. Finally, if a range key is present, it will also interpolate that into the ids so that the batch get will acquire the correct record.

Parameters:

  • table (String)

    the name of the table to write the object to

  • ids (Array)

    to fetch, can also be a string of just one id

  • range_key (Number)

    the range key of the record

Since:

  • 0.2.0



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/dynamoid/adapter.rb', line 70

def read(table, ids, options = {})
  range_key = options[:range_key]
  if ids.respond_to?(:each)
    ids = ids.collect{|id| range_key ? [id, range_key] : id}
    if Dynamoid::Config.partitioning?
      results = batch_get_item(table => id_with_partitions(ids))
      {table => result_for_partition(results[table],table)}
    else
      batch_get_item(table => ids)
    end
  else
    if Dynamoid::Config.partitioning?
      ids = range_key ? [[ids, range_key]] : ids
      results = batch_get_item(table => id_with_partitions(ids))
      result_for_partition(results[table],table).first
    else
      get_item(table, ids, options)
    end
  end
end

#reconnect!Object

Establishes a connection to the underyling adapter and caches all its tables for speedier future lookups. Issued when the adapter is first called.

Since:

  • 0.2.0



21
22
23
24
25
26
# File 'lib/dynamoid/adapter.rb', line 21

def reconnect!
  require "dynamoid/adapter/#{Dynamoid::Config.adapter}" unless Dynamoid::Adapter.const_defined?(Dynamoid::Config.adapter.camelcase)
  @adapter = Dynamoid::Adapter.const_get(Dynamoid::Config.adapter.camelcase)
  @adapter.connect! if @adapter.respond_to?(:connect!)
  self.tables = benchmark('Cache Tables') {list_tables}
end

#result_for_partition(results, table_name) ⇒ Object

Takes an array of query results that are partitioned, find the most recently updated ones that share an id and range_key, and return only the most recently updated. Compares each result by their id and updated_at attributes; if the updated_at is the greatest, then it must be the correct result.

Parameters:

  • returned (Array)

    partitioned results from a query

  • table_name (String)

    the name of the table

Since:

  • 0.2.0



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/dynamoid/adapter.rb', line 180

def result_for_partition(results, table_name)
  table = Dynamoid::Adapter::AwsSdk.get_table(table_name)
  
  if table.range_key     
    range_key_name = table.range_key.name.to_sym
    
    final_hash = {}
    
    results.each do |record|
      test_record = final_hash[record[range_key_name]]
      
      if test_record.nil? || ((record[range_key_name] == test_record[range_key_name]) && (record[:updated_at] > test_record[:updated_at]))
        #get ride of our partition and put it in the array with the range key
        record[:id], partition = get_original_id_and_partition  record[:id]
        final_hash[record[range_key_name]] = record
      end
    end
  
    return final_hash.values
  else
    {}.tap do |hash|
      Array(results).each do |result|
        next if result.nil?
        #Need to find the value of id with out the . and partition number
        id, partition = get_original_id_and_partition result[:id]
  
        if !hash[id] || (result[:updated_at] > hash[id][:updated_at])
          result[:id] = id
          hash[id] = result
        end
      end
    end.values
  end
end

#scan(table, query, opts = {}) ⇒ Object

Scans a table. Generally quite slow; try to avoid using scan if at all possible.

Parameters:

  • table (String)

    the name of the table to write the object to

  • scan_hash (Hash)

    a hash of attributes: matching records will be returned by the scan

Since:

  • 0.2.0



128
129
130
131
132
133
134
135
# File 'lib/dynamoid/adapter.rb', line 128

def scan(table, query, opts = {})
  if Dynamoid::Config.partitioning?
    results = benchmark('Scan', table, query) {adapter.scan(table, query, opts)}
    result_for_partition(results,table)
  else
    benchmark('Scan', table, query) {adapter.scan(table, query, opts)}
  end
end

#write(table, object) ⇒ Object

Write an object to the adapter. Partition it to a randomly selected key first if necessary.

Parameters:

  • table (String)

    the name of the table to write the object to

  • object (Object)

    the object itself

Returns:

  • (Object)

    the persisted object

Since:

  • 0.2.0



52
53
54
55
56
57
58
# File 'lib/dynamoid/adapter.rb', line 52

def write(table, object)
  if Dynamoid::Config.partitioning? && object[:id]
    object[:id] = "#{object[:id]}.#{Random.rand(Dynamoid::Config.partition_size)}"
    object[:updated_at] = Time.now.to_f
  end
  put_item(table, object)
end