Class: Bricolage::RedisTask::Import
- Inherits:
-
Action
- Object
- Action
- Bricolage::RedisTask::Import
- Defined in:
- lib/bricolage/redisdatasource.rb
Constant Summary collapse
- BATCH_SIZE =
5000
Instance Method Summary collapse
- #bind(*args) ⇒ Object
- #import(writer) ⇒ Object
-
#initialize(src, table, query, key_column, prefix, encode, expire) ⇒ Import
constructor
A new instance of Import.
- #run ⇒ Object
- #source ⇒ Object
Constructor Details
#initialize(src, table, query, key_column, prefix, encode, expire) ⇒ Import
Returns a new instance of Import.
37 38 39 40 41 42 43 44 45 |
# File 'lib/bricolage/redisdatasource.rb', line 37 def initialize(src, table, query, key_column, prefix, encode, expire) @src = src @table = table @query = query @key_columns = key_column.split(',').map(&:strip) @prefix = prefix || "#{@table.last.schema}_#{@table.last.name}_" @encode = encode @expire = expire end |
Instance Method Details
#bind(*args) ⇒ Object
47 48 49 |
# File 'lib/bricolage/redisdatasource.rb', line 47 def bind(*args) @query.bind(*args) end |
#import(writer) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/bricolage/redisdatasource.rb', line 74 def import(writer) count = 0 @src.query_batch(source, BATCH_SIZE) do |rs| writer.pipelined { rs.each do |row| writer.write(row) count += 1 ds.logger.info "transfered: #{count} rows" if count % 100_0000 == 0 end } end ds.logger.info "all rows written: #{count} rows" end |
#run ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/bricolage/redisdatasource.rb', line 55 def run logger = ds.logger begin logger.info "Key Pattern: #{@prefix}<#{@key_columns.join('_')}>" logger.info "Encode: #{@encode}" logger.info "Expire: #{@expire}" ds.open {|client| writer = RedisRowWriter.for_encode(@encode).new(client, @prefix, @key_columns) import writer } rescue => ex logger.exception ex raise JobFailure, ex. end JobResult.success end |
#source ⇒ Object
51 52 53 |
# File 'lib/bricolage/redisdatasource.rb', line 51 def source @query.stripped_source end |