Class: Embulk::Filter::Jsoncolumn

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/jsoncolumn.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.transaction(config, in_schema) {|task, out_columns| ... } ⇒ Object

Yields:

  • (task, out_columns)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/embulk/filter/jsoncolumn.rb', line 9

def self.transaction(config, in_schema, &control)
  # configuration code:
  task = {
    'schema' => config.param("schema", :array),
    'columns' =>
      config.param('schema', :array, :default => []).inject({}){|a, col|
        a[col['name']] = col['type'].to_sym
        a
      }
  }

  columns = task['columns'].map.with_index{|(name, type), i|
    Column.new(i, name, type)
  }

  #out_columns = in_schema + columns
  out_columns = columns

  yield(task, out_columns)
end

Instance Method Details

#add(page) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/embulk/filter/jsoncolumn.rb', line 37

def add(page)
  # filtering code:
  page.each do |records|
    records.each do |record|
      r = JSON.parse(record)
      page_builder.add(make_record(task['schema'], r))
    end
  end
end

#closeObject



34
35
# File 'lib/embulk/filter/jsoncolumn.rb', line 34

def close
end

#find_by_path(e, path) ⇒ Object



83
84
85
# File 'lib/embulk/filter/jsoncolumn.rb', line 83

def find_by_path(e, path)
  JsonPath.on(e, path).first
end

#finishObject



87
88
89
# File 'lib/embulk/filter/jsoncolumn.rb', line 87

def finish
  page_builder.finish
end

#initObject



30
31
32
# File 'lib/embulk/filter/jsoncolumn.rb', line 30

def init
  # initialization code:
end

#make_record(schema, e) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/embulk/filter/jsoncolumn.rb', line 47

def make_record(schema, e)
  schema.map do |c|
    name = c["name"]
    path = c["path"]
    val = path.nil? ? e[name] : find_by_path(e, path)

    puts "PATH: #{path}"
    puts "VAL: #{val}"

    v = val.nil? ? "" : val
    type = c["type"]
    case type
      when "string"
        v
      when "long"
        v.to_i
      when "double"
        v.to_f
      when "boolean"
        if v.nil?
          nil
        elsif v.kind_of?(String)
          ["yes", "true", "1"].include?(v.downcase)
        elsif v.kind_of?(Numeric)
          !v.zero?
        else
          !!v
        end
      when "timestamp"
        v.empty? ? nil : Time.strptime(v, c["format"])
      else
        raise "Unsupported type #{type}"
    end
  end
end