Class: Bricolage::RedisTask::Import
- Inherits:
-
Action
- Object
- Action
- Bricolage::RedisTask::Import
- Defined in:
- lib/bricolage/redisdatasource.rb
Instance Method Summary collapse
- #bind(*args) ⇒ Object
- #import ⇒ Object
-
#initialize(src, table, query, key_column, prefix, encode) ⇒ Import
constructor
A new instance of Import.
- #key(row) ⇒ Object
- #prefix ⇒ Object
- #read_row ⇒ Object
- #run ⇒ Object
- #source ⇒ Object
- #write_row(row) {|r| ... } ⇒ Object
Constructor Details
#initialize(src, table, query, key_column, prefix, encode) ⇒ Import
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/bricolage/redisdatasource.rb', line 36 def initialize(src, table, query, key_column, prefix, encode) @src = src @table = table @query = query @key_column = key_column puts key_column @prefix = prefix @encode = encode @read_count = 0 @write_count = 0 end |
Instance Method Details
#bind(*args) ⇒ Object
48 49 50 |
# File 'lib/bricolage/redisdatasource.rb', line 48 def bind(*args) @query.bind(*args) end |
#import ⇒ Object
60 61 62 63 64 |
# File 'lib/bricolage/redisdatasource.rb', line 60 def import read_row do |row| write_row(row) end end |
#key(row) ⇒ Object
95 96 97 98 |
# File 'lib/bricolage/redisdatasource.rb', line 95 def key(row) key_columns = @key_column.split(',').map(&:strip).map {|k| row[k]} prefix + key_columns.join('_') end |
#prefix ⇒ Object
56 57 58 |
# File 'lib/bricolage/redisdatasource.rb', line 56 def prefix @prefix = @prefix || "#{@table.last.schema}_#{@table.last.name}_" end |
#read_row ⇒ Object
66 67 68 69 70 71 72 73 74 |
# File 'lib/bricolage/redisdatasource.rb', line 66 def read_row @src.execute_query(source) do |rs| rs.each do |row| yield row @read_count += 1 ds.logger.info "Rows read: #{@read_count}" if @read_count % 100000 == 0 end end end |
#run ⇒ Object
100 101 102 103 104 105 106 107 108 109 |
# File 'lib/bricolage/redisdatasource.rb', line 100 def run begin import rescue => ex ds.logger.error ex.backtrace.join("\n") raise JobFailure, ex. end ds.logger.info "Rows written: #{@write_count}" JobResult.success end |
#source ⇒ Object
52 53 54 |
# File 'lib/bricolage/redisdatasource.rb', line 52 def source @query.stripped_source end |
#write_row(row) {|r| ... } ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/bricolage/redisdatasource.rb', line 76 def write_row(row, &block) key = key(row) case @encode when 'hash' # set a value for each key:field pair r = [] row.each do |field,value| r.push ds.client.hset(key, field, value) end when 'json' r = ds.client.set(key, JSON.generate(row)) else raise %Q("encode: #{type}" is not supported) end yield r if block ds.logger.info "Key sample: #{key}" if @write_count == 0 @write_count += 1 end |