Class: Dataflow::Nodes::JoinNode

Inherits:
ComputeNode show all
Defined in:
lib/dataflow/nodes/join_node.rb

Overview

Performs a join operation on 2 dependencies.

Constant Summary collapse

VALID_TYPES =
%w(inner left).freeze

Constants included from SchemaMixin

SchemaMixin::SAMPLE_DATA_OUTPUT, SchemaMixin::SEPARATOR

Instance Method Summary collapse

Methods inherited from ComputeNode

#all_dependencies, #compute, #data_node, data_node_opts, #dependencies, dependency_opts, ensure_data_node_exists, ensure_dependencies, #execute_local_batch_computation, #execute_local_computation, #execution_valid?, #explain_update, #force_computing_lock_release!, #locked_for_computing?, #make_batch_params, #needs_automatic_recomputing?, #recompute, #schema, #set_defaults, #updated?, #updated_at, #updated_at=

Methods included from SchemaMixin

#infer_partial_schema, #infer_schema, #sample_data, #schema_inferrer

Methods included from Dataflow::Node

#all_dependencies, find, #metadata, #recompute, #required_by, #validate!

Instance Method Details

#compute_implObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/dataflow/nodes/join_node.rb', line 46

def compute_impl
  all_same_postgresql = db_backend == :postgresql
  all_same_postgresql &&= dependencies[1..-1].all? do |dep|
    dep.db_backend == :postgresql && dep.db_name == db_name
  end

  if all_same_postgresql
    # use SQL join
    execute_sql_join
    self.updated_at = Time.now
  else
    # use software join
    super
  end
end

#required_schemaObject



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/dataflow/nodes/join_node.rb', line 33

def required_schema
  return {} unless dependencies.count == 2

  # merge both dependencies schemas
  sch1 = dependencies.first.schema || {}
  sch1 = sch1.select { |k,v| select_keys1.include?(k.to_s) } if select_keys1.present?
  sch2 = dependencies.second.schema || {}
  sch2 = sch2.select { |k,v| select_keys2.include?(k.to_s) } if select_keys2.present?
  sch = sch1.merge(sch2)

  sch
end

#valid_for_computation?Boolean

Returns:

  • (Boolean)


24
25
26
27
28
29
30
31
# File 'lib/dataflow/nodes/join_node.rb', line 24

def valid_for_computation?
  # We need an equivalent number of keys as they will be matched with each others
  if other_keys1.count != other_keys2.count
    errors.add(:other_keys2, "#{self.class} other_keys2 must match other_keys1's length")
  end

  super
end