Module: Dynamoid::Adapter

Extended by:
Adapter
Included in:
Adapter
Defined in:
lib/dynamoid/adapter.rb,
lib/dynamoid/adapter/aws_sdk.rb

Overview

Adapter provides a generic, write-through class that abstracts variations in the underlying connections to provide a uniform response to Dynamoid.

Defined Under Namespace

Modules: AwsSdk

Instance Attribute Summary collapse

Instance Method Summary collapse

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method, *args, &block) ⇒ Object

Delegate all methods that aren’t defind here to the underlying adapter.

Since:

  • 0.2.0



222
223
224
225
# File 'lib/dynamoid/adapter.rb', line 222

def method_missing(method, *args, &block)
  return benchmark(method, *args) {adapter.send(method, *args, &block)} if @adapter.respond_to?(method)
  super
end

Instance Attribute Details

#tablesObject

Returns the value of attribute tables.



8
9
10
# File 'lib/dynamoid/adapter.rb', line 8

def tables
  @tables
end

Instance Method Details

#adapterObject

The actual adapter currently in use: presently AwsSdk.

Since:

  • 0.2.0



13
14
15
16
# File 'lib/dynamoid/adapter.rb', line 13

def adapter
  reconnect! unless @adapter
  @adapter
end

#benchmark(method, *args) { ... } ⇒ Object

Shows how long it takes a method to run on the adapter. Useful for generating logged output.

Parameters:

  • method (Symbol)

    the name of the method to appear in the log

  • args (Array)

    the arguments to the method to appear in the log

Yields:

  • the actual code to benchmark

Returns:

  • the result of the yield

Since:

  • 0.2.0



37
38
39
40
41
42
# File 'lib/dynamoid/adapter.rb', line 37

def benchmark(method, *args)
  start = Time.now
  result = yield
  Dynamoid.logger.info "(#{((Time.now - start) * 1000.0).round(2)} ms) #{method.to_s.split('_').collect(&:upcase).join(' ')}#{ " - #{args.inspect}" unless args.nil? || args.empty? }"
  return result
end

#delete(table, ids, options = {}) ⇒ Object

Delete an item from a table. If partitioning is turned on, deletes all partitioned keys as well.

Parameters:

  • table (String)

    the name of the table to write the object to

  • ids (Array)

    to delete, can also be a string of just one id

  • range_key (Array)

    of the record to delete, can also be a string of just one range_key



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/dynamoid/adapter.rb', line 101

def delete(table, ids, options = {})
  range_key = options[:range_key] #array of range keys that matches the ids passed in
  if ids.respond_to?(:each)
    if range_key.respond_to?(:each)
      #turn ids into array of arrays each element being hash_key, range_key
      ids = ids.each_with_index.map{|id,i| [id,range_key[i]]}
    else
      ids = range_key ? [[ids, range_key]] : ids
    end
    
    if Dynamoid::Config.partitioning?
      batch_delete_item(table => id_with_partitions(ids))
    else
      batch_delete_item(table => ids)
    end
  else
    if Dynamoid::Config.partitioning?
      ids = range_key ? [[ids, range_key]] : ids
      batch_delete_item(table => id_with_partitions(ids))
    else
      delete_item(table, ids, options)
    end
  end
end

#get_original_id_and_partition(id) ⇒ String

Get original id (hash_key) and partiton number from a hash_key

Parameters:

  • id (String)

    the id or hash_key of a record, ex. xxxxx.13

Returns:

  • (String, String)

    original_id and the partition number, ex original_id = xxxxx partition = 13



170
171
172
173
174
175
# File 'lib/dynamoid/adapter.rb', line 170

def get_original_id_and_partition id
  partition = id.split('.').last
  id = id.split(".#{partition}").first

  return id, partition
end

#id_with_partitions(ids) ⇒ Object

Takes a list of ids and returns them with partitioning added. If an array of arrays is passed, we assume the second key is the range key and pass it in unchanged.

Examples:

Partition id 1

Dynamoid::Adapter.id_with_partitions(['1']) # ['1.0', '1.1', '1.2', ..., '1.199']

Partition id 1 and range_key 1.0

Dynamoid::Adapter.id_with_partitions([['1', 1.0]]) # [['1.0', 1.0], ['1.1', 1.0], ['1.2', 1.0], ..., ['1.199', 1.0]]

Parameters:

  • ids (Array)

    array of ids to partition

Since:

  • 0.2.0



161
162
163
# File 'lib/dynamoid/adapter.rb', line 161

def id_with_partitions(ids)
  Array(ids).collect {|id| (0...Dynamoid::Config.partition_size).collect{|n| id.is_a?(Array) ? ["#{id.first}.#{n}", id.last] : "#{id}.#{n}"}}.flatten(1)
end

#query(table_name, opts = {}) ⇒ Array

Query the DynamoDB table. This employs DynamoDB’s indexes so is generally faster than scanning, but is only really useful for range queries, since it can only find by one hash key at once. Only provide one range key to the hash. If paritioning is on, will run a query for every parition and join the results

Parameters:

  • table_name (String)

    the name of the table

  • opts (Hash) (defaults to: {})

    the options to query the table with

Options Hash (opts):

  • :hash_value (String)

    the value of the hash key to find

  • :range_value (Range)

    find the range key within this range

  • :range_greater_than (Number)

    find range keys greater than this

  • :range_less_than (Number)

    find range keys less than this

  • :range_gte (Number)

    find range keys greater than or equal to this

  • :range_lte (Number)

    find range keys less than or equal to this

Returns:

  • (Array)

    an array of all matching items



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/dynamoid/adapter.rb', line 242

def query(table_name, opts = {})
  
  unless Dynamoid::Config.partitioning?
    #no paritioning? just pass to the standard query method
    Dynamoid::Adapter::AwsSdk.query(table_name, opts)
  else
    #get all the hash_values that could be possible
    ids = id_with_partitions(opts[:hash_value])

    #lets not overwrite with the original options
    modified_options = opts.clone     
    results = []
    
    #loop and query on each of the partition ids
    ids.each do |id|
      modified_options[:hash_value] = id

      query_result = Dynamoid::Adapter::AwsSdk.query(table_name, modified_options)
      results += query_result.inject([]){|array, result| array += [result]} if query_result.any?
    end 

    result_for_partition results, table_name
  end
end

#read(table, ids, options = {}) ⇒ Object

Read one or many keys from the selected table. This method intelligently calls batch_get or get on the underlying adapter depending on whether ids is a range or a single key: additionally, if partitioning is enabled, it batch_gets all keys in the partition space automatically. Finally, if a range key is present, it will also interpolate that into the ids so that the batch get will acquire the correct record.

Parameters:

  • table (String)

    the name of the table to write the object to

  • ids (Array)

    to fetch, can also be a string of just one id

  • options: (Hash)

    Passed to the underlying query. The :range_key option is required whenever the table has a range key, unless multiple ids are passed in and Dynamoid::Config.partitioning? is turned off.

Since:

  • 0.2.0



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/dynamoid/adapter.rb', line 72

def read(table, ids, options = {})
  range_key = options.delete(:range_key)

  if ids.respond_to?(:each)
    ids = ids.collect{|id| range_key ? [id, range_key] : id}
    if Dynamoid::Config.partitioning?
      results = batch_get_item({table => id_with_partitions(ids)}, options)
      {table => result_for_partition(results[table],table)}
    else
      batch_get_item({table => ids}, options)
    end
  else
    if Dynamoid::Config.partitioning?
      ids = range_key ? [[ids, range_key]] : ids
      results = batch_get_item({table => id_with_partitions(ids)}, options)
      result_for_partition(results[table],table).first
    else
      options[:range_key] = range_key if range_key
      get_item(table, ids, options)
    end
  end
end

#reconnect!Object

Establishes a connection to the underyling adapter and caches all its tables for speedier future lookups. Issued when the adapter is first called.

Since:

  • 0.2.0



21
22
23
24
25
26
# File 'lib/dynamoid/adapter.rb', line 21

def reconnect!
  require "dynamoid/adapter/#{Dynamoid::Config.adapter}" unless Dynamoid::Adapter.const_defined?(Dynamoid::Config.adapter.camelcase)
  @adapter = Dynamoid::Adapter.const_get(Dynamoid::Config.adapter.camelcase)
  @adapter.connect! if @adapter.respond_to?(:connect!)
  self.tables = benchmark('Cache Tables') {list_tables}
end

#result_for_partition(results, table_name) ⇒ Object

Takes an array of query results that are partitioned, find the most recently updated ones that share an id and range_key, and return only the most recently updated. Compares each result by their id and updated_at attributes; if the updated_at is the greatest, then it must be the correct result.

Parameters:

  • returned (Array)

    partitioned results from a query

  • table_name (String)

    the name of the table

Since:

  • 0.2.0



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/dynamoid/adapter.rb', line 184

def result_for_partition(results, table_name)
  table = Dynamoid::Adapter::AwsSdk.get_table(table_name)
  
  if table.range_key     
    range_key_name = table.range_key.name.to_sym
    
    final_hash = {}

    results.each do |record|
      test_record = final_hash[record[range_key_name]]
      
      if test_record.nil? || ((record[range_key_name] == test_record[range_key_name]) && (record[:updated_at] > test_record[:updated_at]))
        #get ride of our partition and put it in the array with the range key
        record[:id], partition = get_original_id_and_partition  record[:id]
        final_hash[record[range_key_name]] = record
      end
    end
  
    return final_hash.values
  else
    {}.tap do |hash|
      Array(results).each do |result|
        next if result.nil?
        #Need to find the value of id with out the . and partition number
        id, partition = get_original_id_and_partition result[:id]
  
        if !hash[id] || (result[:updated_at] > hash[id][:updated_at])
          result[:id] = id
          hash[id] = result
        end
      end
    end.values
  end
end

#scan(table, query, opts = {}) ⇒ Object

Scans a table. Generally quite slow; try to avoid using scan if at all possible.

Parameters:

  • table (String)

    the name of the table to write the object to

  • scan_hash (Hash)

    a hash of attributes: matching records will be returned by the scan

Since:

  • 0.2.0



132
133
134
135
136
137
138
139
# File 'lib/dynamoid/adapter.rb', line 132

def scan(table, query, opts = {})
  if Dynamoid::Config.partitioning?
    results = benchmark('Scan', table, query) {adapter.scan(table, query, opts)}
    result_for_partition(results,table)
  else
    benchmark('Scan', table, query) {adapter.scan(table, query, opts)}
  end
end

#write(table, object, options = nil) ⇒ Object

Write an object to the adapter. Partition it to a randomly selected key first if necessary.

Parameters:

  • table (String)

    the name of the table to write the object to

  • object (Object)

    the object itself

  • options (Hash) (defaults to: nil)

    Options that are passed to the put_item call

Returns:

  • (Object)

    the persisted object

Since:

  • 0.2.0



53
54
55
56
57
58
59
# File 'lib/dynamoid/adapter.rb', line 53

def write(table, object, options = nil)
  if Dynamoid::Config.partitioning? && object[:id]
    object[:id] = "#{object[:id]}.#{Random.rand(Dynamoid::Config.partition_size)}"
    object[:updated_at] = Time.now.to_f
  end
  put_item(table, object, options)
end