Class: Mushy::Flux

Inherits:
Object
  • Object
show all
Defined in:
lib/mushy/flux.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeFlux

Returns a new instance of Flux.



13
14
15
# File 'lib/mushy/flux.rb', line 13

def initialize
  guard
end

Class Attribute Details

.allObject

Returns the value of attribute all.



18
19
20
# File 'lib/mushy/flux.rb', line 18

def all
  @all
end

Instance Attribute Details

#configObject

Returns the value of attribute config.



9
10
11
# File 'lib/mushy/flux.rb', line 9

def config
  @config
end

#flowObject

Returns the value of attribute flow.



11
12
13
# File 'lib/mushy/flux.rb', line 11

def flow
  @flow
end

#idObject

Returns the value of attribute id.



5
6
7
# File 'lib/mushy/flux.rb', line 5

def id
  @id
end

#masherObject

Returns the value of attribute masher.



10
11
12
# File 'lib/mushy/flux.rb', line 10

def masher
  @masher
end

#parent_fluxsObject

Returns the value of attribute parent_fluxs.



7
8
9
# File 'lib/mushy/flux.rb', line 7

def parent_fluxs
  @parent_fluxs
end

#subscribed_toObject

Returns the value of attribute subscribed_to.



8
9
10
# File 'lib/mushy/flux.rb', line 8

def subscribed_to
  @subscribed_to
end

#typeObject

Returns the value of attribute type.



6
7
8
# File 'lib/mushy/flux.rb', line 6

def type
  @type
end

Class Method Details

.inherited(subclass) ⇒ Object



20
21
22
23
24
25
26
27
# File 'lib/mushy/flux.rb', line 20

def inherited subclass
  if (self != Mushy::Flux)
    Mushy::Flux.inherited subclass
  else
    self.all ||= []
    self.all << subclass
  end
end

Instance Method Details

#convert_this_to_an_array(value) ⇒ Object



157
158
159
160
161
162
163
# File 'lib/mushy/flux.rb', line 157

def convert_this_to_an_array value
  [value]
    .flatten
    .map { |x| x.to_s.split(',').map { |x| x.strip } }
    .flatten
    .select { |x| x && x != '' }
end

#convert_to_symbolized_hash(event) ⇒ Object



165
166
167
168
169
# File 'lib/mushy/flux.rb', line 165

def convert_to_symbolized_hash event
  data = SymbolizedHash.new
  event.each { |k, v| data[k] = v }
  data
end

#execute(incoming_event) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/mushy/flux.rb', line 38

def execute incoming_event
  guard

  incoming_event = SymbolizedHash.new(incoming_event) if incoming_event.is_a?(Hash)

  incoming_split = masher.mash(config, incoming_event)[:incoming_split]
  config_considering_an_imcoming_split = config
                                           .reject { |x, _| incoming_split && x.to_s == 'join' }
                                           .reduce({}) { |t, i| t[i[0]] = i[1]; t }

  events = incoming_split ? incoming_event[incoming_split] : [incoming_event]

  results = events.map { |e| execute_single_event e, config_considering_an_imcoming_split }

  return results.first unless incoming_split

  results = join_these_results([results].flatten, incoming_event, config[:join]) if config[:join]

  results.flatten
end

#execute_single_event(event, config) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/mushy/flux.rb', line 59

def execute_single_event event, config

  mashed_config = masher.mash config, event

  the_original_join = mashed_config[:join]
  mashed_config[:join] = nil if mashed_config[:incoming_split]

  results = process event, mashed_config

  returned_one_result = results.is_a?(Hash)

  results = standardize_these results
  results = shape_these results, event, config

  return results.first if the_original_join

  return results if mashed_config[:outgoing_split]
  
  returned_one_result ? results.first : results

rescue Exception => e
  raise e if config[:error_strategy].to_s == ''
  return [] if config[:error_strategy] == 'ignore'
  { exception: e.message }
end

#group_these_results(results, event, by) ⇒ Object



119
120
121
122
123
# File 'lib/mushy/flux.rb', line 119

def group_these_results results, event, by
  group_by = by.split('|')[0]
  result_key = by.split('|')[1]
  results.group_by { |x| x[group_by] }.map { |k, v| SymbolizedHash.new( { result_key => v } ) }
end

#guardObject



30
31
32
33
34
35
36
# File 'lib/mushy/flux.rb', line 30

def guard
  self.id ||= SecureRandom.uuid
  self.parent_fluxs ||= []
  self.subscribed_to ||= []
  self.masher ||= Masher.new
  self.config ||= SymbolizedHash.new
end

#ignore_these_results(results, event, by) ⇒ Object



138
139
140
141
142
143
# File 'lib/mushy/flux.rb', line 138

def ignore_these_results results, event, by
  return results if by.to_s == ''
  ignore_fields = by.split ','
  results.each { |r| ignore_fields.each { |f| r.delete f } }
  results
end

#join_these_results(results, event, by) ⇒ Object



129
130
131
# File 'lib/mushy/flux.rb', line 129

def join_these_results results, event, by
  [SymbolizedHash.new( { by => results } )]
end

#limit_these_results(results, event, by) ⇒ Object



112
113
114
115
116
117
# File 'lib/mushy/flux.rb', line 112

def limit_these_results results, event, by
  results
    .each_with_index
    .select { |x, i| i < by.to_i }
    .map { |x, _| x }
end

#merge_these_results(results, event, by) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
# File 'lib/mushy/flux.rb', line 145

def merge_these_results results, event, by
  keys_to_merge = convert_this_to_an_array by
  keys_to_merge = event.keys.map { |x| x.to_s } if (keys_to_merge[0] == '*')

  results.map do |result|
                event.select { |k, _| keys_to_merge.include? k.to_s }.each do |k, v|
                  result[k] = v unless result[k]
                end
                result
              end
end

#model_these_results(results, event, by) ⇒ Object



133
134
135
136
# File 'lib/mushy/flux.rb', line 133

def model_these_results results, event, by
  return results unless by.any?
  results.map { |x| masher.mash by, x }
end

#outgoing_split_these_results(results, event, by) ⇒ Object



125
126
127
# File 'lib/mushy/flux.rb', line 125

def outgoing_split_these_results results, event, by
  results.map { |x| Masher.new.dig by, x }.flatten
end

#process(event, config) ⇒ Object



171
172
173
# File 'lib/mushy/flux.rb', line 171

def process event, config
  event
end

#shape_these(results, event, config) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/mushy/flux.rb', line 92

def shape_these results, event, config
  supported_shaping = [:merge, :outgoing_split, :group, :model, :ignore, :join, :sort, :limit]

  shaping = supported_shaping
  if (config[:shaping])
    shaping = convert_this_to_an_array(config[:shaping]).map { |x| x.to_sym }
  end

  supported_shaping
    .select { |x| config[x] }
    .each_with_index
    .sort_by { |x, i| shaping.index(x) || i + supported_shaping.count }
    .map { |x, _| x }
    .reduce(results) { |t, i| self.send("#{i}_these_results".to_sym, t, event, config[i]) }
end

#sort_these_results(results, event, by) ⇒ Object



108
109
110
# File 'lib/mushy/flux.rb', line 108

def sort_these_results results, event, by
  results.sort { |x| x[by].to_i }
end

#standardize_these(results) ⇒ Object



85
86
87
88
89
90
# File 'lib/mushy/flux.rb', line 85

def standardize_these results
  [results]
    .flatten
    .map { |x| x.is_a?(Hash) ? convert_to_symbolized_hash(x) : nil }
    .select { |x| x }
end