31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/aggregate_streams/handle.rb', line 31
def handle(message_data)
logger.trace { "Handling message (Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})" }
Retry.(MessageStore::ExpectedVersion::Error, millisecond_intervals: [0, 10, 100, 1000]) do
stream_id = Messaging::StreamName.get_id(message_data.stream_name)
aggregation, version = store.fetch(stream_id, include: :version)
if aggregation.processed?(message_data)
logger.info(tag: :ignored) { "Message already handled (Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})" }
return
end
raw_input_data = Messaging::Message::Transform::MessageData.read(message_data)
input_metadata = Messaging::Message::Metadata.build(raw_input_data[:metadata])
output_metadata = raw_metadata(input_metadata)
write_message_data = MessageStore::MessageData::Write.new
SetAttributes.(write_message_data, message_data, copy: [:type, :data])
write_message_data.metadata = output_metadata
input_category = Messaging::StreamName.get_category(message_data.stream_name)
write_message_data = transform(write_message_data, input_category)
if write_message_data
assure_message_data(write_message_data)
else
logger.info(tag: :ignored) { "Message ignored (Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})" }
return
end
stream_name = stream_name(stream_id)
write.(write_message_data, stream_name, expected_version: version)
logger.info do
message_type = message_data.type
unless write_message_data.type == message_type
message_type = "#{write_message_data.type} ← #{message_type}"
end
"Message copied (Message Type: #{message_type}, Stream: #{message_data.stream_name}, Global Position: #{message_data.global_position})"
end
end
end
|