Class: MoSQL::Schema

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/mosql/schema.rb

Instance Method Summary collapse

Methods included from Logging

#log

Constructor Details

#initialize(map) ⇒ Schema

Returns a new instance of Schema.



48
49
50
51
52
53
54
55
56
# File 'lib/mosql/schema.rb', line 48

def initialize(map)
  @map = {}
  map.each do |dbname, db|
    @map[dbname] ||= {}
    db.each do |cname, spec|
      @map[dbname][cname] = parse_spec("#{dbname}.#{cname}", spec)
    end
  end
end

Instance Method Details

#all_columns(schema) ⇒ Object



144
145
146
147
148
149
150
151
152
153
# File 'lib/mosql/schema.rb', line 144

def all_columns(schema)
  cols = []
  schema[:columns].each do |col|
    cols << col[:name]
  end
  if schema[:meta][:extra_props]
    cols << "_extra_props"
  end
  cols
end

#all_mongo_dbsObject



194
195
196
# File 'lib/mosql/schema.rb', line 194

def all_mongo_dbs
  @map.keys
end

#check_columns!(ns, spec) ⇒ Object



31
32
33
34
35
36
37
38
39
# File 'lib/mosql/schema.rb', line 31

def check_columns!(ns, spec)
  seen = Set.new
  spec[:columns].each do |col|
    if seen.include?(col[:source])
      raise "Duplicate source #{col[:source]} in column definition #{col[:name]} for #{ns}."
    end
    seen.add(col[:source])
  end
end

#collections_for_mongo_db(db) ⇒ Object



198
199
200
# File 'lib/mosql/schema.rb', line 198

def collections_for_mongo_db(db)
  (@map[db]||{}).keys
end

#copy_data(db, ns, objs) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/mosql/schema.rb', line 155

def copy_data(db, ns, objs)
  schema = find_ns!(ns)
  db.synchronize do |pg|
    sql = "COPY \"#{schema[:meta][:table]}\" " +
      "(#{all_columns(schema).map {|c| "\"#{c}\""}.join(",")}) FROM STDIN"
    pg.execute(sql)
    objs.each do |o|
      pg.put_copy_data(transform_to_copy(ns, o, schema) + "\n")
    end
    pg.put_copy_end
    begin
      pg.get_result.check
    rescue PGError => e
      db.send(:raise_error, e)
    end
  end
end

#create_schema(db, clobber = false) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/mosql/schema.rb', line 58

def create_schema(db, clobber=false)
  @map.values.map(&:values).flatten.each do |collection|
    meta = collection[:meta]
    log.info("Creating table '#{meta[:table]}'...")
    db.send(clobber ? :create_table! : :create_table?, meta[:table]) do
      collection[:columns].each do |col|
        column col[:name], col[:type]

        if col[:source].to_sym == :_id
          primary_key [col[:name].to_sym]
        end
      end
      if meta[:extra_props]
        column '_extra_props', 'TEXT'
      end
    end
  end
end

#fetch_and_delete_dotted(obj, dotted) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/mosql/schema.rb', line 93

def fetch_and_delete_dotted(obj, dotted)
  pieces = dotted.split(".")
  breadcrumbs = []
  while pieces.length > 1
    key = pieces.shift
    breadcrumbs << [obj, key]
    obj = obj[key]
    return nil unless obj.is_a?(Hash)
  end

  val = obj.delete(pieces.first)

  breadcrumbs.reverse.each do |obj, key|
    obj.delete(key) if obj[key].empty?
  end

  val
end

#find_ns(ns) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/mosql/schema.rb', line 77

def find_ns(ns)
  db, collection = ns.split(".")
  schema = (@map[db] || {})[collection]
  if schema.nil?
    log.debug("No mapping for ns: #{ns}")
    return nil
  end
  schema
end

#find_ns!(ns) ⇒ Object

Raises:



87
88
89
90
91
# File 'lib/mosql/schema.rb', line 87

def find_ns!(ns)
  schema = find_ns(ns)
  raise SchemaError.new("No mapping for namespace: #{ns}") if schema.nil?
  schema
end

#parse_spec(ns, spec) ⇒ Object



41
42
43
44
45
46
# File 'lib/mosql/schema.rb', line 41

def parse_spec(ns, spec)
  out = spec.dup
  out[:columns] = to_array(spec[:columns])
  check_columns!(ns, out)
  out
end

#primary_sql_key_for_ns(ns) ⇒ Object



202
203
204
# File 'lib/mosql/schema.rb', line 202

def primary_sql_key_for_ns(ns)
  find_ns!(ns)[:columns].find {|c| c[:source] == '_id'}[:name]
end

#quote_copy(val) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/mosql/schema.rb', line 173

def quote_copy(val)
  case val
  when nil
    "\\N"
  when true
    't'
  when false
    'f'
  else
    val.to_s.gsub(/([\\\t\n\r])/, '\\\\\\1')
  end
end

#table_for_ns(ns) ⇒ Object



190
191
192
# File 'lib/mosql/schema.rb', line 190

def table_for_ns(ns)
  find_ns!(ns)[:meta][:table]
end

#to_array(lst) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/mosql/schema.rb', line 7

def to_array(lst)
  array = []
  lst.each do |ent|
    if ent.is_a?(Hash) && ent[:source].is_a?(String) && ent[:type].is_a?(String)
      # new configuration format
      array << {
        :source => ent.delete(:source),
        :type   => ent.delete(:type),
        :name   => ent.first.first,
      }
    elsif ent.is_a?(Hash) && ent.keys.length == 1 && ent.values.first.is_a?(String)
      array << {
        :source => ent.first.first,
        :name   => ent.first.first,
        :type   => ent.first.last
      }
    else
      raise "Invalid ordered hash entry #{ent.inspect}"
    end

  end
  array
end

#transform(ns, obj, schema = nil) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/mosql/schema.rb', line 112

def transform(ns, obj, schema=nil)
  schema ||= find_ns!(ns)

  obj = obj.dup
  row = []
  schema[:columns].each do |col|

    source = col[:source]
    type = col[:type]

    v = fetch_and_delete_dotted(obj, source)
    case v
    when BSON::Binary, BSON::ObjectId
      v = v.to_s
    end
    row << v
  end

  if schema[:meta][:extra_props]
    # Kludgily delete binary blobs from _extra_props -- they may
    # contain invalid UTF-8, which to_json will not properly encode.
    obj.each do |k,v|
      obj.delete(k) if v.is_a?(BSON::Binary)
    end
    row << obj.to_json
  end

  log.debug { "Transformed: #{row.inspect}" }

  row
end

#transform_to_copy(ns, row, schema = nil) ⇒ Object



186
187
188
# File 'lib/mosql/schema.rb', line 186

def transform_to_copy(ns, row, schema=nil)
  row.map { |c| quote_copy(c) }.join("\t")
end