Module: Sequel::Impala::DatasetMethods

Includes:
UnmodifiedIdentifiers::DatasetMethods
Included in:
Dataset, JDBC::Hive2::Dataset, Rbhive::Dataset
Defined in:
lib/sequel/adapters/shared/impala.rb

Constant Summary collapse

BACKTICK =
'`'.freeze
APOS =
"'".freeze
STRING_ESCAPE_RE =
/([\\'])/
STRING_ESCAPE_REPLACE =
'\\\\\1'.freeze
BOOL_TRUE =
'true'.freeze
BOOL_FALSE =
'false'.freeze
CONSTANT_LITERAL_MAP =
{:CURRENT_TIMESTAMP=>'now()'.freeze}.freeze
PAREN_OPEN =
'('.freeze
PAREN_CLOSE =
')'.freeze
SPACE =
' '.freeze
NOT =
'NOT '.freeze
REGEXP =
' REGEXP '.freeze
EXCEPT_SOURCE_COLUMN =
:__source__
EXCEPT_STRATEGIES =
[:not_exists, :not_in, :left_join, :group_by].freeze
SELECT_VALUES =
'VALUES '.freeze

Instance Method Summary collapse

Instance Method Details

#complex_expression_sql_append(sql, op, args) ⇒ Object

Handle string concatenation using the concat string function. Don’t use the ESCAPE syntax when using LIKE/NOT LIKE, as Impala doesn’t support escaping LIKE metacharacters. Support regexps on Impala using the REGEXP operator. For cast insensitive regexps, cast both values to uppercase first.



448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
# File 'lib/sequel/adapters/shared/impala.rb', line 448

def complex_expression_sql_append(sql, op, args)
  case op
  when :'||'
    literal_append(sql, Sequel.function(:concat, *args))
  when :LIKE, :'NOT LIKE'
    sql << PAREN_OPEN
    literal_append(sql, args.at(0))
    sql << SPACE << op.to_s << SPACE
    literal_append(sql, args.at(1))
    sql << PAREN_CLOSE
  when :~, :'!~', :'~*', :'!~*'
    if op == :'~*'  || op == :'!~*'
      args = args.map{|a| Sequel.function(:upper, a)}
    end
    sql << NOT if op == :'!~'  || op == :'!~*'
    sql << PAREN_OPEN
    literal_append(sql, args.at(0))
    sql << REGEXP
    literal_append(sql, args.at(1))
    sql << PAREN_CLOSE
  else
    super
  end
end

#constant_sql_append(sql, constant) ⇒ Object

Use now() for current timestamp, as Impala doesn’t support CURRENT_TIMESTAMP.



475
476
477
# File 'lib/sequel/adapters/shared/impala.rb', line 475

def constant_sql_append(sql, constant)
  sql << CONSTANT_LITERAL_MAP.fetch(constant, constant.to_s)
end

#date_add_sql_append(sql, da) ⇒ Object

Use the addition operator combined with interval types to handle date arithmetic when using the date_arithmetic extension.



482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/sequel/adapters/shared/impala.rb', line 482

def date_add_sql_append(sql, da)
  h = da.interval
  expr = da.expr
  intervals = []
  each_valid_interval_unit(h, Sequel::SQL::DateAdd::DatasetMethods::DEF_DURATION_UNITS) do |value, sql_unit|
    intervals << Sequel.lit("INTERVAL #{value} #{sql_unit}")
  end
  if intervals.empty?
    return literal_append(sql, Sequel.cast(expr, Time))
  else
    intervals.unshift(Sequel.cast(expr, Time))
    return complex_expression_sql_append(sql, :+, intervals)
  end
end

#deleteObject

DELETE is emulated on Impala and doesn’t return the number of modified rows.



499
500
501
502
# File 'lib/sequel/adapters/shared/impala.rb', line 499

def delete
  super
  nil
end

#delete_sqlObject

Emulate DELETE using INSERT OVERWRITE selecting all columns from the table, with a reversed condition used for WHERE.



506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'lib/sequel/adapters/shared/impala.rb', line 506

def delete_sql
  return @opts[:prepared_sql] if @opts[:prepared_sql]
  sql = @opts[:append_sql] || sql_string_origin
  sql << "INSERT OVERWRITE "
  source_list_append(sql, opts[:from])
  sql << " SELECT * FROM "
  source_list_append(sql, opts[:from])
  if where = opts[:where]
    sql << " WHERE NOT ("
    literal_append(sql, where)
    sql << ")"
  else
    sql << " WHERE false"
  end
  sql
end

#empty?Boolean

Don’t remove an order, because that breaks things when offsets are used, as Impala requires an order when using an offset.

Returns:



542
543
544
# File 'lib/sequel/adapters/shared/impala.rb', line 542

def empty?
  get(Sequel::SQL::AliasedExpression.new(1, :one)).nil?
end

#except(other, opts = OPTS) ⇒ Object

Emulate EXCEPT using a chosen strategy and checking for values in only the first table.

Raises:



547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
# File 'lib/sequel/adapters/shared/impala.rb', line 547

def except(other, opts=OPTS)
  raise(InvalidOperation, "EXCEPT ALL not supported") if opts[:all]
  raise(InvalidOperation, "The :from_self=>false option to except is not supported") if opts[:from_self] == false

  strategy, *keys = @opts[:except_strategy]
  ds = from_self(:alias=>:t1)

  ds = case strategy
  when :not_exists
    ds.exclude(other.
        from_self(:alias=>:t2).
        where(keys.map{|key| [Sequel.qualify(:t1, key), Sequel.qualify(:t2, key)]}).
        select(nil).
        exists)
  when :not_in
    raise Sequel::Error, ":not_in EXCEPT strategy only supports a single key" unless keys.length == 1
    key = keys.first
    ds.exclude(Sequel.qualify(:t1, key)=>other.from_self(:alias=>:t2).select(key))
  when :left_join
    ds.left_join(other.from_self(:alias=>:t2).as(:t2), keys.map{|key| [key, key]}).
      where(Sequel.or(keys.map{|key| [Sequel.qualify(:t2, key), nil]})).
      select_all(:t1)
  else
    cols = columns
    rhs = other.from_self.select_group(*other.columns).select_append(Sequel.expr(2).as(EXCEPT_SOURCE_COLUMN))
    ds.select_group(*cols).
      select_append(Sequel.expr(1).as(EXCEPT_SOURCE_COLUMN)).
      union(rhs, all: true).
      select_group(*cols).
      having{{count.function.* => 1, min(EXCEPT_SOURCE_COLUMN) => 1}}
  end

  ds.from_self(opts)
end

#except_strategy(strategy, *keys) ⇒ Object

The strategy to use for EXCEPT emulation. By default, uses a GROUP BY emulation, as that doesn’t require you provide a key column, but you can use this to choose a NOT EXISTS, NOT IN, or LEFT JOIN emulation, providing the unique key column.

Raises:



585
586
587
588
# File 'lib/sequel/adapters/shared/impala.rb', line 585

def except_strategy(strategy, *keys)
  raise Sequel::Error, "invalid EXCEPT strategy: #{strategy.inspect}" unless EXCEPT_STRATEGIES.include?(strategy)
  clone(:except_strategy=>[strategy, *keys])
end

#fromObject

Implicitly qualify tables if using the :search_path database option.



524
525
526
527
# File 'lib/sequel/adapters/shared/impala.rb', line 524

def from(*)
  ds = super
  ds.clone(:from => ds.opts[:from].map{|t| db.implicit_qualify(t)})
end

#insert_overwriteObject

Use INSERT OVERWRITE instead of INSERT INTO when inserting into this dataset:

DB[:table].insert_overwrite.insert(DB[:other])
# INSERT OVERWRITE table SELECT * FROM other


594
595
596
# File 'lib/sequel/adapters/shared/impala.rb', line 594

def insert_overwrite
  clone(:insert_overwrite=>true)
end

#insert_supports_empty_values?Boolean

Impala does not support INSERT DEFAULT VALUES.

Returns:



599
600
601
# File 'lib/sequel/adapters/shared/impala.rb', line 599

def insert_supports_empty_values?
  false
end

#intersect(other, opts = OPTS) ⇒ Object

Emulate INTERSECT using a join and checking for values in both tables.

Raises:



604
605
606
607
608
609
610
611
612
613
614
615
616
# File 'lib/sequel/adapters/shared/impala.rb', line 604

def intersect(other, opts=OPTS)
  raise(InvalidOperation, "INTERSECT ALL not supported") if opts[:all]
  raise(InvalidOperation, "The :from_self=>false option to intersect is not supported") if opts[:from_self] == false
  raise(Error, "Attempt to INTERSECT on dataset with no columns: #{inspect}") if columns.empty?
  raise(Error, "Attempt to INTERSECT other dataset with no columns: #{other.inspect}") if other.columns.empty?

  cols = columns.zip(other.columns)
  from_self(alias: :l)
    .join(other){|lj, j, _| Sequel.&(*cols.map{|c1,c2| Sequel.expr(Sequel.qualify(lj, c2)=>Sequel.qualify(j, c1)) | {Sequel.qualify(lj, c2)=>nil, Sequel.qualify(j, c1)=>nil}})}
    .select_all(:l)
    .distinct
    .from_self(opts)
end

#join_table(type, table, expr = nil, options = OPTS, &block) ⇒ Object

Implicitly qualify tables if using the :search_path database option.



530
531
532
# File 'lib/sequel/adapters/shared/impala.rb', line 530

def join_table(type, table, expr=nil, options=OPTS, &block)
  super(type, db.implicit_qualify(table), expr, options, &block)
end

#supports_cte?(type = :select) ⇒ Boolean

Impala supports non-recursive common table expressions.

Returns:



619
620
621
# File 'lib/sequel/adapters/shared/impala.rb', line 619

def supports_cte?(type=:select)
  true
end

#supports_cte_in_subqueries?Boolean

Returns:



623
624
625
# File 'lib/sequel/adapters/shared/impala.rb', line 623

def supports_cte_in_subqueries?
  true
end

#supports_derived_column_lists?Boolean

Impala doesn’t support derived column lists when aliasing tables.

Returns:



629
630
631
# File 'lib/sequel/adapters/shared/impala.rb', line 629

def supports_derived_column_lists?
  false
end

#supports_intersect_except_all?Boolean

Impala doesn’t support EXCEPT or INTERSECT, but support is emulated for them. However, EXCEPT ALL and INTERSECT ALL are not emulated.

Returns:



635
636
637
# File 'lib/sequel/adapters/shared/impala.rb', line 635

def supports_intersect_except_all?
  false
end

#supports_is_true?Boolean

Impala only support IS NULL, not IS TRUE or IS FALSE.

Returns:



640
641
642
# File 'lib/sequel/adapters/shared/impala.rb', line 640

def supports_is_true?
  false
end

#supports_multiple_column_in?Boolean

Impala doesn’t support IN when used with multiple columns.

Returns:



645
646
647
# File 'lib/sequel/adapters/shared/impala.rb', line 645

def supports_multiple_column_in?
  false
end

#supports_regexp?Boolean

Impala supports regexps using the REGEXP operator.

Returns:



650
651
652
# File 'lib/sequel/adapters/shared/impala.rb', line 650

def supports_regexp?
  true
end

#supports_window_functions?Boolean

Impala supports window functions.

Returns:



655
656
657
# File 'lib/sequel/adapters/shared/impala.rb', line 655

def supports_window_functions?
  true
end

#to_parquet(table, options = OPTS) ⇒ Object

Create a parquet file from this dataset. table should be the table name to create. To specify a path for the parquet file, use the :location option.

Examples:

DB[:t].to_parquet(:p)
# CREATE TABLE `p` STORED AS parquet AS
# SELECT * FROM `t`

DB[:t].to_parquet(:p, :location=>'/a/b')
# CREATE TABLE `p` STORED AS parquet LOCATION '/a/b'
# SELECT * FROM `t`


672
673
674
# File 'lib/sequel/adapters/shared/impala.rb', line 672

def to_parquet(table, options=OPTS)
  db.create_table(table, options.merge(:as=>self, :stored_as=>:parquet))
end

#truncate_sqlObject

Emulate TRUNCATE by using INSERT OVERWRITE selecting all columns from the table, with WHERE false.



536
537
538
# File 'lib/sequel/adapters/shared/impala.rb', line 536

def truncate_sql
  unfiltered.delete_sql
end

#update(values = OPTS) ⇒ Object

UPDATE is emulated on Impala, and returns nil instead of the number of modified rows



678
679
680
681
# File 'lib/sequel/adapters/shared/impala.rb', line 678

def update(values=OPTS)
  super
  nil
end

#update_sql(values) ⇒ Object

Emulate UPDATE using INSERT OVERWRITE AS SELECT. For all columns used in the given values, use a CASE statement. In the CASE statement, set the value to the new value if the row matches WHERE conditions of the current dataset, otherwise use the existing value.



687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
# File 'lib/sequel/adapters/shared/impala.rb', line 687

def update_sql(values)
  sql = String.new
  sql << "INSERT OVERWRITE "
  source_list_append(sql, opts[:from])
  sql << " SELECT "
  comma = false

  if where = opts[:where]
    where = Sequel.lit(literal(where))
  else
    where = true
  end

  select_all.columns.each do |c|
    if comma
      sql <<  comma
    else
      comma = ', '
    end

    if values.has_key?(c)
      new_value = values[c]
      literal_append(sql, Sequel.case({where=>new_value}, c).as(c))
    else
      quote_identifier_append(sql, c)
    end
  end
  sql << " FROM "
  source_list_append(sql, opts[:from])
  sql
end

#with(name, dataset, opts = {}) ⇒ Object



719
720
721
722
723
724
725
726
# File 'lib/sequel/adapters/shared/impala.rb', line 719

def with(name, dataset, opts={})
  if has_cte?(dataset)
    s, ds = hoist_cte(dataset)
    s.with(name, ds, opts)
  else
    super
  end
end

#with_recursive(name, nonrecursive, recursive, opts = {}) ⇒ Object



728
729
730
731
732
733
734
735
736
737
738
# File 'lib/sequel/adapters/shared/impala.rb', line 728

def with_recursive(name, nonrecursive, recursive, opts={})
  if has_cte?(nonrecursive)
    s, ds = hoist_cte(nonrecursive)
    s.with_recursive(name, ds, recursive, opts)
  elsif has_cte?(recursive)
    s, ds = hoist_cte(recursive)
    s.with_recursive(name, nonrecursive, ds, opts)
  else
    super
  end
end