Class: Bricolage::RedisTask::Import

Inherits:
Action
  • Object
show all
Defined in:
lib/bricolage/redisdatasource.rb

Constant Summary collapse

BATCH_SIZE =
5000

Instance Method Summary collapse

Constructor Details

#initialize(src, table, query, key_column, prefix, encode, expire) ⇒ Import

Returns a new instance of Import.



37
38
39
40
41
42
43
44
45
# File 'lib/bricolage/redisdatasource.rb', line 37

def initialize(src, table, query, key_column, prefix, encode, expire)
  @src = src
  @table = table
  @query = query
  @key_columns = key_column.split(',').map(&:strip)
  @prefix = prefix || "#{@table.last.schema}_#{@table.last.name}_"
  @encode = encode
  @expire = expire
end

Instance Method Details

#bind(*args) ⇒ Object



47
48
49
# File 'lib/bricolage/redisdatasource.rb', line 47

def bind(*args)
  @query.bind(*args)
end

#import(writer) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/bricolage/redisdatasource.rb', line 74

def import(writer)
  count = 0
  @src.query_batch(source, BATCH_SIZE) do |rs|
    writer.pipelined {
      rs.each do |row|
        writer.write(row)
        count += 1
        ds.logger.info "transfered: #{count} rows" if count % 100_0000 == 0
      end
    }
  end
  ds.logger.info "all rows written: #{count} rows"
end

#runObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/bricolage/redisdatasource.rb', line 55

def run
  logger = ds.logger
  begin
    logger.info "Key Pattern: #{@prefix}<#{@key_columns.join('_')}>"
    logger.info "Encode: #{@encode}"
    logger.info "Expire: #{@expire}"
    ds.open {|client|
      writer = RedisRowWriter.for_encode(@encode).new(client, @prefix, @key_columns)
      import writer
    }
  rescue => ex
    logger.exception ex
    raise JobFailure, ex.message
  end
  JobResult.success
end

#sourceObject



51
52
53
# File 'lib/bricolage/redisdatasource.rb', line 51

def source
  @query.stripped_source
end