Class: Sourced::Decider
- Inherits:
-
Object
- Object
- Sourced::Decider
- Extended by:
- Consumer
- Defined in:
- lib/sourced/decider.rb
Constant Summary collapse
- PREFIX =
'decide'
Constants included from Sync
Constants included from Evolve
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#uncommitted_events ⇒ Object
readonly
Returns the value of attribute uncommitted_events.
Class Method Summary collapse
-
.command(*args, &block) ⇒ Message
Define a command class, register a command handler and define a method to send the command Example: command :add_item, name: String do |cmd| cmd.follow(ItemAdded, item_id: SecureRandom.uuid, name: cmd.payload.name) end.
- .decide(cmd_type, &block) ⇒ Object
-
.handle_command(cmd) ⇒ Object
The Decider interface.
-
.handle_events(events) ⇒ Object
The Reactor interface.
- .handled_commands ⇒ Object
-
.handled_events ⇒ Object
Register as a Reactor.
- .inherited(subclass) ⇒ Object
-
.load(stream_id, upto: nil) ⇒ Decider
Load a Decider from event history.
Instance Method Summary collapse
- #==(other) ⇒ Object
- #apply(event_class, payload = {}) ⇒ Object
- #catch_up ⇒ Object
- #commit {|output_events| ... } ⇒ Object
- #decide(command) ⇒ Object
- #events(upto: seq) ⇒ Object
- #handle_command(command) ⇒ Object
-
#handle_events(events) ⇒ Object
TODO: idempotent event and command handling Reactor interface Handle events, return new commands Workers will handle route these commands to their target Deciders.
- #init_state(id) ⇒ Object
-
#initialize(id = SecureRandom.uuid, backend: Sourced.config.backend, logger: Sourced.config.logger) ⇒ Decider
constructor
A new instance of Decider.
- #inspect ⇒ Object
- #load(after: nil, upto: nil) ⇒ Object
Methods included from Consumer
Methods included from Sync
Methods included from React
Methods included from Evolve
Constructor Details
#initialize(id = SecureRandom.uuid, backend: Sourced.config.backend, logger: Sourced.config.logger) ⇒ Decider
Returns a new instance of Decider.
125 126 127 128 129 130 131 132 133 |
# File 'lib/sourced/decider.rb', line 125 def initialize(id = SecureRandom.uuid, backend: Sourced.config.backend, logger: Sourced.config.logger) @id = id @backend = backend @logger = logger @seq = 0 @state = init_state(id) @uncommitted_events = [] @__current_command = nil end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
123 124 125 |
# File 'lib/sourced/decider.rb', line 123 def id @id end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
123 124 125 |
# File 'lib/sourced/decider.rb', line 123 def seq @seq end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
123 124 125 |
# File 'lib/sourced/decider.rb', line 123 def state @state end |
#uncommitted_events ⇒ Object (readonly)
Returns the value of attribute uncommitted_events.
123 124 125 |
# File 'lib/sourced/decider.rb', line 123 def uncommitted_events @uncommitted_events end |
Class Method Details
.command(*args, &block) ⇒ Message
Define a command class, register a command handler and define a method to send the command Example:
command :add_item, name: String do |cmd|
cmd.follow(ItemAdded, item_id: SecureRandom.uuid, name: cmd.payload.name)
end
# The exmaple above will define a command class ‘AddItem` in the current namespace: AddItem = Message.define(’namespace.add_item’, payload_schema: { name: String })
# Optionally you can pass an explicit command type string:
command :add_item, 'todos.items.add', name: String do |cmd|
# It will also register the command handler: decide AddItem do |cmd|
cmd.follow(ItemAdded, item_id: SecureRandom.uuid, name: cmd.payload.name)
end
# And an :add_item method to send the command: def add_item(name:)
issue_command AddItem, name:
end
This method can be used on Decider instances:
aggregate.add_item(name: 'Buy milk')
Payload schema is a Plumb Hash schema. See: github.com/ismasan/plumb#typeshash
The helper method will instantiate an instance of the command class and validate its attributes with #valid? Only valid commands will be issued to the handler. The method returns the command instance. If #valid? is false, then the command was not issued. Example:
cmd = aggregate.add_item(name: 10)
cmd.valid? # => false
cmd.errors # => { name: 'must be a String' }
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/sourced/decider.rb', line 94 def command(*args, &block) raise ArgumentError, 'command block expects signature (state, command)' unless block.arity == 2 = nil cmd_name = nil payload_schema = {} segments = name.split('::').map(&:downcase) case args in [cmd_name] = [*segments, cmd_name].join('.') in [cmd_name, Hash => payload_schema] = [*segments, cmd_name].join('.') in [cmd_name, String => , Hash => payload_schema] in [cmd_name, String => ] else raise ArgumentError, "Invalid arguments for #{self}.command" end klass_name = cmd_name.to_s.split('_').map(&:capitalize).join cmd_class = Command.define(, payload_schema:) const_set(klass_name, cmd_class) decide cmd_class, &block define_method(cmd_name) do |**payload| issue_command cmd_class, payload end end |
.decide(cmd_type, &block) ⇒ Object
47 48 49 50 |
# File 'lib/sourced/decider.rb', line 47 def decide(cmd_type, &block) handled_commands << cmd_type define_method(Sourced.(PREFIX, cmd_type.name), &block) end |
.handle_command(cmd) ⇒ Object
The Decider interface
31 32 33 |
# File 'lib/sourced/decider.rb', line 31 def handle_command(cmd) load(cmd.stream_id).handle_command(cmd) end |
.handle_events(events) ⇒ Object
The Reactor interface
25 26 27 |
# File 'lib/sourced/decider.rb', line 25 def handle_events(events) load(events.first.stream_id).handle_events(events) end |
.handled_commands ⇒ Object
43 44 45 |
# File 'lib/sourced/decider.rb', line 43 def handled_commands @handled_commands ||= [] end |
.handled_events ⇒ Object
Register as a Reactor
21 |
# File 'lib/sourced/decider.rb', line 21 def handled_events = self.handled_events_for_react |
.inherited(subclass) ⇒ Object
13 14 15 16 17 18 |
# File 'lib/sourced/decider.rb', line 13 def inherited(subclass) super handled_commands.each do |cmd_type| subclass.handled_commands << cmd_type end end |
.load(stream_id, upto: nil) ⇒ Decider
Load a Decider from event history
39 40 41 |
# File 'lib/sourced/decider.rb', line 39 def load(stream_id, upto: nil) new(stream_id).load(upto:) end |
Instance Method Details
#==(other) ⇒ Object
139 140 141 |
# File 'lib/sourced/decider.rb', line 139 def ==(other) other.is_a?(self.class) && id == other.id && seq == other.seq end |
#apply(event_class, payload = {}) ⇒ Object
173 174 175 176 177 178 179 180 181 182 |
# File 'lib/sourced/decider.rb', line 173 def apply(event_class, payload = {}) evt = __current_command.follow_with_attributes( event_class, attrs: { seq: __next_sequence }, metadata: { producer: self.class.consumer_info.group_id }, payload: ) uncommitted_events << evt evolve(state, [evt]) end |
#catch_up ⇒ Object
156 157 158 159 160 |
# File 'lib/sourced/decider.rb', line 156 def catch_up seq_was = seq load(after: seq_was) [seq_was, seq] end |
#commit {|output_events| ... } ⇒ Object
184 185 186 187 188 189 |
# File 'lib/sourced/decider.rb', line 184 def commit(&) output_events = uncommitted_events.slice(0..-1) yield output_events @uncommitted_events = [] output_events end |
#decide(command) ⇒ Object
166 167 168 169 170 171 |
# File 'lib/sourced/decider.rb', line 166 def decide(command) command = __set_current_command(command) send(Sourced.(PREFIX, command.class.name), state, command) @__current_command = nil [state, uncommitted_events] end |
#events(upto: seq) ⇒ Object
162 163 164 |
# File 'lib/sourced/decider.rb', line 162 def events(upto: seq) backend.read_event_stream(id, upto:) end |
#handle_command(command) ⇒ Object
201 202 203 204 205 206 207 208 |
# File 'lib/sourced/decider.rb', line 201 def handle_command(command) # TODO: this might raise an exception from a worker # Think what to do with invalid commands here raise "invalid command #{command.inspect} #{command.errors.inspect}" unless command.valid? logger.info "#{self.inspect} Handling #{command.type}" decide(command) save end |
#handle_events(events) ⇒ Object
TODO: idempotent event and command handling Reactor interface Handle events, return new commands Workers will handle route these commands to their target Deciders
215 216 217 |
# File 'lib/sourced/decider.rb', line 215 def handle_events(events) react(events) end |
#init_state(id) ⇒ Object
143 144 145 |
# File 'lib/sourced/decider.rb', line 143 def init_state(id) nil end |
#inspect ⇒ Object
135 136 137 |
# File 'lib/sourced/decider.rb', line 135 def inspect %(<#{self.class} id:#{id} seq:#{seq}>) end |
#load(after: nil, upto: nil) ⇒ Object
147 148 149 150 151 152 153 154 |
# File 'lib/sourced/decider.rb', line 147 def load(after: nil, upto: nil) events = backend.read_event_stream(id, after:, upto:) if events.any? @seq = events.last.seq evolve(state, events) end self end |