38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/mushy/flux.rb', line 38
def execute incoming_event
guard
incoming_event = SymbolizedHash.new(incoming_event) if incoming_event.is_a?(Hash)
incoming_split = masher.mash(config, incoming_event)[:incoming_split]
config_considering_an_imcoming_split = config
.reject { |x, _| incoming_split && x.to_s == 'join' }
.reduce({}) { |t, i| t[i[0]] = i[1]; t }
events = incoming_split ? incoming_event[incoming_split] : [incoming_event]
results = events.map { |e| execute_single_event e, config_considering_an_imcoming_split }
return results.first unless incoming_split
results = join_these_results([results].flatten, incoming_event, config[:join]) if config[:join]
results.flatten
end
|