Module: Pacer::Routes::RouteOperations

Includes:
BulkOperations
Included in:
Pacer::Route, Wrappers::ElementWrapper
Defined in:
lib/pacer/transform/lookup_ids.rb,
lib/pacer/transform/map.rb,
lib/pacer/transform/join.rb,
lib/pacer/transform/reduce.rb,
lib/pacer/visitors/section.rb,
lib/pacer/transform/process.rb,
lib/pacer/filter/loop_filter.rb,
lib/pacer/filter/uniq_filter.rb,
lib/pacer/side_effect/as_var.rb,
lib/pacer/transform/flat_map.rb,
lib/pacer/transform/identity.rb,
lib/pacer/transform/parallel.rb,
lib/pacer/filter/block_filter.rb,
lib/pacer/filter/range_filter.rb,
lib/pacer/filter/uniq_section.rb,
lib/pacer/filter/where_filter.rb,
lib/pacer/side_effect/counted.rb,
lib/pacer/side_effect/visitor.rb,
lib/pacer/filter/future_filter.rb,
lib/pacer/filter/object_filter.rb,
lib/pacer/transform/make_pairs.rb,
lib/pacer/side_effect/aggregate.rb,
lib/pacer/side_effect/is_unique.rb,
lib/pacer/transform/stream_sort.rb,
lib/pacer/transform/stream_uniq.rb,
lib/pacer/transform/sort_section.rb,
lib/pacer/filter/java_loop_filter.rb,
lib/pacer/side_effect/group_count.rb,
lib/pacer/transform/count_section.rb,
lib/pacer/transform/has_count_cap.rb,
lib/pacer/transform/gather_section.rb,
lib/pacer/filter/limit_section_filter.rb,
lib/pacer/route/mixin/route_operations.rb,
lib/pacer/transform/intersect_sections.rb

Overview

Additional convenience and data analysis methods that can be mixed into routes if they support the full route interface.

Instance Method Summary collapse

Methods included from BulkOperations

#bulk_job, #bulk_map

Instance Method Details

#aggregate(into = nil, &block) ⇒ Object



3
4
5
6
7
8
9
# File 'lib/pacer/side_effect/aggregate.rb', line 3

def aggregate(into = nil, &block)
  aggregate = ::Pacer::SideEffect::Aggregate
  r = self
  r = section(into, aggregate::ElementSet) if into.is_a? Symbol
  into = block if block
  r.chain_route :side_effect => aggregate, :into => into
end

#all(opts = {}, &block) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/pacer/filter/java_loop_filter.rb', line 8

def all(opts = {}, &block)
  if opts[:include_self]
    branch do |this|
      this.identity
    end.branch do |this|
      this.java_loop(opts, &block)
    end.merge_exhaustive
  else
    java_loop(opts, &block)
  end
end

#as(section_name = nil) ⇒ Object

see #as_var for the old as implementation



9
10
11
# File 'lib/pacer/visitors/section.rb', line 9

def as(section_name = nil)
  section section_name
end

#as_var(name) ⇒ Object

Store the current intermediate element in the route’s vars hash by the given name so that it is accessible subsequently in the processing of the route.

Deprecated.



8
9
10
11
# File 'lib/pacer/side_effect/as_var.rb', line 8

def as_var(name)
  as = ::Pacer::SideEffect::AsVar
  section(name, as::SingleElementSet).chain_route :side_effect => as, :variable_name => name
end

#at(pos) ⇒ Object



21
22
23
# File 'lib/pacer/filter/range_filter.rb', line 21

def at(pos)
  chain_route :filter => :range, :index => pos
end

#breadth_first(opts = {}, &block) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/pacer/filter/loop_filter.rb', line 48

def breadth_first(opts = {}, &block)
  min_depth = opts.fetch :min_depth, 0
  max_depth = opts.fetch :max_depth, 10
  (min_depth..max_depth).reduce(self) do |route, depth|
    route.branch do |b|
      b.repeat depth, &block
    end
  end.merge_exhaustive
end

#compactObject



20
21
22
# File 'lib/pacer/filter/object_filter.rb', line 20

def compact
  is_not nil
end

#countObject Also known as: run!



8
9
10
# File 'lib/pacer/side_effect/counted.rb', line 8

def count
  counted.count
end

#count_section(section = nil, &block) ⇒ Object



4
5
6
7
# File 'lib/pacer/transform/count_section.rb', line 4

def count_section(section = nil, &block)
  chain_route transform: Pacer::Transform::CountSection, key_block: block, section: section,
    element_type: :array
end

#countedObject



4
5
6
# File 'lib/pacer/side_effect/counted.rb', line 4

def counted
  chain_route :side_effect => :counted
end

#custom_sort_section(section = nil, &block) ⇒ Object

Deprecated: use sort_section



16
17
18
# File 'lib/pacer/transform/sort_section.rb', line 16

def custom_sort_section(section = nil, &block)
  chain_route transform: :sort_section, custom_sort_block: block, section: section
end

#deepest(&block) ⇒ Object



8
9
10
# File 'lib/pacer/filter/loop_filter.rb', line 8

def deepest(&block)
  loop(&block).deepest!
end

#difference_sections(section) ⇒ Object

See details on intersect_section. Emits only elements that are not in other sections.



39
40
41
# File 'lib/pacer/transform/intersect_sections.rb', line 39

def difference_sections(section)
  chain_route transform: Pacer::Transform::IntersectSections, section: section, operation: :difference
end

#edges_route?Boolean

Returns true if this route countains only edges.

Returns:

  • (Boolean)


87
88
89
# File 'lib/pacer/route/mixin/route_operations.rb', line 87

def edges_route?
  self.is_a? Pacer::Core::Graph::EdgesRoute
end

#fast_group_count(hash_map = nil) ⇒ Object



3
4
5
# File 'lib/pacer/side_effect/group_count.rb', line 3

def fast_group_count(hash_map = nil)
  chain_route :side_effect => :group_count, :hash_map => hash_map
end

#flat_map(opts = {}, &block) ⇒ Object



4
5
6
# File 'lib/pacer/transform/flat_map.rb', line 4

def flat_map(opts = {}, &block)
  map(&block).scatter(opts)
end

#frequency_counts(*props) ⇒ Object



50
51
52
53
54
# File 'lib/pacer/route/mixin/route_operations.rb', line 50

def frequency_counts(*props)
  result = Hash.new 0
  group_count(*props).each { |k, v| result[v] += 1 }
  result
end

#frequency_groups(*props) ⇒ Object



44
45
46
47
48
# File 'lib/pacer/route/mixin/route_operations.rb', line 44

def frequency_groups(*props)
  result = Hash.new { |h, k| h[k] = [] }
  group_count(*props).each { |k, v| result[v] << k }
  result
end

#gather_section(section = nil, opts = {}) ⇒ Object



4
5
6
7
8
# File 'lib/pacer/transform/gather_section.rb', line 4

def gather_section(section = nil, opts = {})
  wrapper = Pacer::Wrappers::WrapperSelector.build graph, element_type, extensions
  chain_route opts.merge(element_type: :array, transform: :gather_section,
                         section: section, build_wrapper: wrapper)
end

#group_count(*props) ⇒ Object Also known as: frequencies

Creates a hash where the key is the properties and return value of the given block, and the value is the number of times each key was found in the results set.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/pacer/route/mixin/route_operations.rb', line 15

def group_count(*props)
  result = Hash.new(0)
  props = props.collect { |p| p.to_s }
  if props.empty? and block_given?
    each { |e| result[yield(e)] += 1 }
  elsif block_given?
    each do |e|
      key = props.collect { |p| e.getProperty(p) }
      key << yield(e)
      result[key] += 1
    end
  elsif props.count == 1
    prop = props.first
    each do |e|
      result[e.getProperty(prop)] += 1
    end
  elsif props.any?
    each do |e|
      result[props.collect { |p| e.getProperty(p) }] += 1
    end
  else
    each do |e|
      result[e] += 1
    end
  end
  result
end

#has?(element) ⇒ Boolean

Returns:

  • (Boolean)


8
9
10
# File 'lib/pacer/route/mixin/route_operations.rb', line 8

def has?(element)
  any? { |e| e == element }
end

#has_count?(opts = {}) ⇒ Boolean

Returns:

  • (Boolean)


8
9
10
# File 'lib/pacer/transform/has_count_cap.rb', line 8

def has_count?(opts = {})
  has_count_route(opts).first
end

#has_count_route(opts = {}) ⇒ Object



4
5
6
# File 'lib/pacer/transform/has_count_cap.rb', line 4

def has_count_route(opts = {})
  chain_route({ :transform => :has_count_cap, element_type: :object }.merge(opts))
end

#identityObject



4
5
6
# File 'lib/pacer/transform/identity.rb', line 4

def identity
  chain_route route_name: 'identity', pipe_class: com.tinkerpop.pipes.IdentityPipe
end

#inspect_class_nameString

Creates a terse, human-friendly name for the class based on its element type, function and info.

Returns:

  • (String)


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/pacer/route/mixin/route_operations.rb', line 110

def inspect_class_name
  s = case element_type
      when :vertex
        'V'
      when :edge
        'E'
      when :object
        'Obj'
      when :mixed
        'Elem'
      else
        element_type.to_s.capitalize
      end
  s = "#{s}-#{function.name.split('::').last.sub(/Filter|Route$/, '')}" if function
  s = "#{s} #{ @info }" if @info
  s
end

#intersect_sections(section) ⇒ Object

Set operations on sections like intersect_sections, difference_sections, etc. allow a route to do things like produce a stream of the set of employees who worked in multiple groups within a compnay, in a single traversal. To take that example further, imagine that we define the following traversals:

company.groups.employees

Give me all of the employees who have worked in every group in the company:

company.groups.as(:g).employees.intersect_sections(:g)

To understand how this works, we can think about the underlying structure of the graph, with a company with 2 groups which share employee e2 and also have their own employees:

©—>(g1)=—>(e1)

|         '-->(e2)
|             /
|        .---`
'---(g2)=--->(e3)

The simple route company.groups.employees.paths will produce:

[c, g1, e1]
[c, g1, e2]
[c, g2, e2]
[c, g2, e3]

By adding a section to the groups with groups.as(:section_name), we set up the mechanism allowing us to respond to the event when the group in the path changes in subsequent stages of the route. That is what .intersect_sections(:section_name) does. It can conceptually keep a set of employees from each group and then when the source data has been exhausted, it can do a set intersection on those groups, then use the resulting set as its source of resulting data.



34
35
36
# File 'lib/pacer/transform/intersect_sections.rb', line 34

def intersect_sections(section)
  chain_route transform: Pacer::Transform::IntersectSections, section: section, operation: :intersection
end

#is(value) ⇒ Object



4
5
6
7
8
9
10
# File 'lib/pacer/filter/object_filter.rb', line 4

def is(value)
  if value.is_a? Symbol
    chain_route filter: Pacer::Filter::SectionFilter, section: value
  else
    chain_route filter: Pacer::Filter::ObjectFilter, value: value
  end
end

#is_not(value) ⇒ Object



12
13
14
15
16
17
18
# File 'lib/pacer/filter/object_filter.rb', line 12

def is_not(value)
  if value.is_a? Symbol
    chain_route filter: Pacer::Filter::SectionFilter, section: value, negate: true
  else
    chain_route filter: Pacer::Filter::ObjectFilter, value: value, negate: true
  end
end

#is_unique(&block) ⇒ Object

This method adds the IsUniquePipe to the pipeline and whenever the pipeline is built, yields the pipe to the block given here.

See #unique? below for example usage.



8
9
10
# File 'lib/pacer/side_effect/is_unique.rb', line 8

def is_unique(&block)
  chain_route side_effect: :is_unique, on_build_pipe: block
end

#java_loop(opts = {}, &block) ⇒ Object



4
5
6
# File 'lib/pacer/filter/java_loop_filter.rb', line 4

def java_loop(opts = {}, &block)
  chain_route(opts.merge :filter => Pacer::Filter::JavaLoopFilter, :looping_route => block)
end

#join(&block) ⇒ Object



4
5
6
# File 'lib/pacer/transform/join.rb', line 4

def join(&block)
  chain_route transform: Pacer::Transform::Join, block: block
end

#left_difference_sections(section) ⇒ Object

See details on intersect_section. Emits only elements that are in the first section and not in any subsequent ones.



44
45
46
# File 'lib/pacer/transform/intersect_sections.rb', line 44

def left_difference_sections(section)
  chain_route transform: Pacer::Transform::IntersectSections, section: section, operation: :left_difference
end

#limit(max) ⇒ Object Also known as: take



11
12
13
# File 'lib/pacer/filter/range_filter.rb', line 11

def limit(max)
  chain_route :filter => :range, :limit => max
end

#limit_section(section = nil, max) ⇒ Object



4
5
6
# File 'lib/pacer/filter/limit_section_filter.rb', line 4

def limit_section(section = nil, max)
  chain_route filter: Pacer::Filter::LimitSectionFilter, section_max: max, section: section
end

#lookahead(opts = {}, &block) ⇒ Object



4
5
6
# File 'lib/pacer/filter/future_filter.rb', line 4

def lookahead(opts = {}, &block)
  chain_route({ :filter => :future, :block => block }.merge(opts))
end

#lookup_ids(*args) ⇒ Object

args is (optional) extensions followed by an (optional) options hash



6
7
8
9
10
11
12
13
# File 'lib/pacer/transform/lookup_ids.rb', line 6

def lookup_ids(*args)
  if args.last.is_a? Hash
    opts = args.pop
  else
    opts = {}
  end
  chain_route({transform: :lookup_ids, element_type: :vertex, extensions: args, wrapper: nil}.merge(opts))
end

#loop(opts = {}, &block) ⇒ Object



4
5
6
# File 'lib/pacer/filter/loop_filter.rb', line 4

def loop(opts = {}, &block)
  chain_route(opts.merge :filter => Pacer::Filter::LoopFilter, :looping_route => block)
end

#make_pairs(other = nil, &block) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/pacer/transform/make_pairs.rb', line 4

def make_pairs(other = nil, &block)
  if block
    # This would be cool if it could create a pair based on
    fail 'not implemented yet'
  elsif other
    if other.is_a? Route and [element_type, other.element_type].all? { |t| [:vertex, :edge].include? t }
      et = :path
    else
      et = :object
    end
    other = other.to_a
    if other.empty?
      empty(self)
    else
      flat_map(element_type: et, route_name: 'make_pairs') do |el|
        other.map { |o| [el, o] }
      end
    end
  else
    fail Pacer::ClientError, 'No source for pairs given to make_pairs'
  end
end

#map(opts = {}, &block) ⇒ Object



4
5
6
# File 'lib/pacer/transform/map.rb', line 4

def map(opts = {}, &block)
  chain_route({:transform => :map, :block => block, :element_type => :object, :extensions => []}.merge(opts))
end

#mixed_route?Boolean

Returns true if this route could contain both vertices and edges.

Returns:

  • (Boolean)


77
78
79
# File 'lib/pacer/route/mixin/route_operations.rb', line 77

def mixed_route?
  self.is_a? Pacer::Core::Graph::MixedRoute
end

#most_frequent(range = 0, include_counts = false) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/pacer/route/mixin/route_operations.rb', line 56

def most_frequent(range = 0, include_counts = false)
  if include_counts
    result = group_count.sort_by { |k, v| -v }[range]
    if not result and range.is_a? Fixnum
      []
    else
      result
    end
  else
    result = group_count.sort_by { |k, v| -v }[range]
    if range.is_a? Fixnum
      result.first if result
    elsif result
      result.collect { |k, v| k }.to_route(:based_on => self)
    else
      [].to_route(:based_on => self)
    end
  end
end

#neg_lookahead(opts = {}, &block) ⇒ Object



8
9
10
# File 'lib/pacer/filter/future_filter.rb', line 8

def neg_lookahead(opts = {}, &block)
  chain_route({ :filter => :future, :neg_block => block }.merge(opts))
end

#offset(amount) ⇒ Object Also known as: drop



16
17
18
# File 'lib/pacer/filter/range_filter.rb', line 16

def offset(amount)
  chain_route :filter => :range, :offset => amount
end

#pages(elements_per_page = 1000) {|page| ... } ⇒ Object

Yields:

  • (page)


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/pacer/route/mixin/route_operations.rb', line 91

def pages(elements_per_page = 1000)
  page = []
  results = []
  idx = 0
  each do |e|
    page << e
    idx += 1
    if idx % elements_per_page == 0
      results << yield(page)
      page = []
    end
  end
  yield page unless page.empty?
  results
end

#parallel(opts = {}, &block) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
# File 'lib/pacer/transform/parallel.rb', line 4

def parallel(opts = {}, &block)
  threads = opts.fetch(:threads, 2)
  branched = (0..threads).reduce(channel_cap buffer: opts.fetch(:in_buffer, threads)) do |r, n|
    r.branch do |x|
      b = block.call x.channel_reader
      b.channel_cap
    end
  end
  branched.merge_exhaustive.gather.channel_fan_in(buffer: opts.fetch(:out_buffer, threads),
                                                  based_on: block.call(self))
end

#process(opts = {}, &block) ⇒ Object



4
5
6
# File 'lib/pacer/transform/process.rb', line 4

def process(opts = {}, &block)
  chain_route({:transform => :process, :block => block}.merge(opts))
end

#range(from, to) ⇒ Object



4
5
6
7
8
9
# File 'lib/pacer/filter/range_filter.rb', line 4

def range(from, to)
  args = { :filter => :range }
  args[:begin] = from if from
  args[:end] = to if to
  chain_route args
end

#reducer(opts = {}, &block) ⇒ Object



4
5
6
# File 'lib/pacer/transform/reduce.rb', line 4

def reducer(opts = {}, &block)
  chain_route({transform: :reduce, reduce: block}.merge(opts))
end

#reject(&block) ⇒ Object



8
9
10
# File 'lib/pacer/filter/block_filter.rb', line 8

def reject(&block)
  chain_route :filter => :block, :block => block, :invert => true, :route_name => 'Reject'
end

#repeat(arg, &block) ⇒ Object

Apply the given path fragment multiple times in succession. If a Range or Array of numbers is given, the results are a combination of the results from all of the specified repetition levels. That is useful if a pattern may be nested to varying depths.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pacer/filter/loop_filter.rb', line 16

def repeat(arg, &block)
  case arg
  when 0
    identity
  when Fixnum
    (0...arg).inject(self) do |route_end, count|
      yield route_end
    end
  when Range
    if arg.exclude_end?
      range = arg.begin..(arg.end - 1)
    else
      range = arg
    end
    r = self.loop(&block).while do |e, depth, p|
      if depth < range.begin
        :loop
      elsif depth < range.end
        :loop_and_emit
      elsif depth == range.end
        :emit
      else
        false
      end
    end
    r.while_description = "repeat #{ arg.inspect }"
    r
  else
    fail ArgumentError, "Invalid repeat range"
  end
end

#right_difference_sections(section) ⇒ Object

See details on intersect_section. Emits only elements that are in the last section and not in the right difference of the previous section.

This one can be confusing if you’ve got more than 2 sections…



51
52
53
# File 'lib/pacer/transform/intersect_sections.rb', line 51

def right_difference_sections(section)
  chain_route transform: Pacer::Transform::IntersectSections, section: section, operation: :right_difference
end

#section(section_name = nil, visitor_target = nil) ⇒ Object



4
5
6
# File 'lib/pacer/visitors/section.rb', line 4

def section(section_name = nil, visitor_target = nil)
  chain_route visitor: :section, section_name: section_name, visitor_target: visitor_target
end

#select(&block) ⇒ Object



4
5
6
# File 'lib/pacer/filter/block_filter.rb', line 4

def select(&block)
  chain_route :filter => :block, :block => block, :route_name => 'Select'
end

#sort_section(section = nil, opts = {}, &block) ⇒ Object

Arity 2 uses custom sort logic. Arity 1 uses sort_by logic.



5
6
7
8
9
10
11
12
13
# File 'lib/pacer/transform/sort_section.rb', line 5

def sort_section(section = nil, opts = {}, &block)
  if not block
    chain_route opts.merge(transform: :sort_section, section: section)
  elsif block.arity == 2
    chain_route opts.merge(transform: :sort_section, custom_sort_block: block, section: section)
  else
    chain_route opts.merge(transform: :sort_section, sort_by_block: block, section: section)
  end
end

#stream_sort(buffer = 1000, silo = 100) ⇒ Object

Deprecated: use sort_section instead.



4
5
6
# File 'lib/pacer/transform/stream_sort.rb', line 4

def stream_sort(buffer = 1000, silo = 100)
  chain_route :transform => :stream_sort, :buffer => buffer, :silo => silo
end

#stream_uniq(buffer = 1000) ⇒ Object

Deprecated: use uniq_section instead.



4
5
6
# File 'lib/pacer/transform/stream_uniq.rb', line 4

def stream_uniq(buffer = 1000)
  chain_route :transform => :stream_uniq, :buffer => buffer
end

#uniqObject

Do not return duplicate elements.



8
9
10
# File 'lib/pacer/filter/uniq_filter.rb', line 8

def uniq
  chain_route :pipe_class => DuplicateFilterPipe, :route_name => 'uniq'
end

#uniq_in_section(section = nil) ⇒ Object



4
5
6
# File 'lib/pacer/filter/uniq_section.rb', line 4

def uniq_in_section(section = nil)
  chain_route filter: Pacer::Filter::UniqueSectionFilter, section: section
end

#unique?Boolean

This method builds a pipe and ataches the IsUniquePipe to the end then iterates the pipeline until it finds a unique element or hits the end.

Returns:

  • (Boolean)


14
15
16
17
18
19
20
# File 'lib/pacer/side_effect/is_unique.rb', line 14

def unique?
  check = nil
  is_unique { |pipe| check = pipe }.each do
    return false unless check.isUnique
  end
  true
end

#unique_pathObject

Filter out any element where its path would contain the same element twice.



13
14
15
# File 'lib/pacer/filter/uniq_filter.rb', line 13

def unique_path
  chain_route :pipe_class => CyclicPathFilterPipe, :route_name => 'unique_path'
end

#unjoinObject



8
9
10
# File 'lib/pacer/transform/join.rb', line 8

def unjoin
  map { |g| g[:components] }.scatter extensions: extensions, graph: graph, element_type: element_type
end

#unless(str, values = {}) ⇒ Object



19
20
21
22
23
24
25
# File 'lib/pacer/filter/where_filter.rb', line 19

def unless(str, values = {})
  if str and str !~ /\A\s*\Z/
    where "not (#{str})", values
  else
    self
  end
end

#vertices_route?Boolean

Returns true if this route countains only vertices.

Returns:

  • (Boolean)


82
83
84
# File 'lib/pacer/route/mixin/route_operations.rb', line 82

def vertices_route?
  self.is_a? Pacer::Core::Graph::VerticesRoute
end

#visitor(visitor) ⇒ Object



4
5
6
# File 'lib/pacer/side_effect/visitor.rb', line 4

def visitor(visitor)
  chain_route side_effect: :visitor, visitor: visitor
end

#where(str, values = {}, *more, &block) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/pacer/filter/where_filter.rb', line 7

def where(str, values = {}, *more, &block)
  if str.is_a? String or str.is_a? Symbol
    if str !~ /\A\s*\Z/
      chain_route :filter => :where, :where_statement => str.to_s, :values => values
    else
      self
    end
  else
    filter(str, values, *more, &block)
  end
end