Class: Sourced::Decider

Inherits:
Object
  • Object
show all
Extended by:
Consumer
Includes:
Evolve, React, Sync
Defined in:
lib/sourced/decider.rb

Constant Summary collapse

PREFIX =
'decide'

Constants included from Sync

Sync::CallableInterface

Constants included from Evolve

Evolve::NOOP_HANDLER

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Consumer

consumer, consumer_info

Methods included from Sync

included, #run_sync_blocks

Methods included from React

included, #react

Methods included from Evolve

#evolve, included

Constructor Details

#initialize(id = SecureRandom.uuid, backend: Sourced.config.backend, logger: Sourced.config.logger) ⇒ Decider

Returns a new instance of Decider.



125
126
127
128
129
130
131
132
133
# File 'lib/sourced/decider.rb', line 125

def initialize(id = SecureRandom.uuid, backend: Sourced.config.backend, logger: Sourced.config.logger)
  @id = id
  @backend = backend
  @logger = logger
  @seq = 0
  @state = init_state(id)
  @uncommitted_events = []
  @__current_command = nil
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



123
124
125
# File 'lib/sourced/decider.rb', line 123

def id
  @id
end

#seqObject (readonly)

Returns the value of attribute seq.



123
124
125
# File 'lib/sourced/decider.rb', line 123

def seq
  @seq
end

#stateObject (readonly)

Returns the value of attribute state.



123
124
125
# File 'lib/sourced/decider.rb', line 123

def state
  @state
end

#uncommitted_eventsObject (readonly)

Returns the value of attribute uncommitted_events.



123
124
125
# File 'lib/sourced/decider.rb', line 123

def uncommitted_events
  @uncommitted_events
end

Class Method Details

.command(*args, &block) ⇒ Message

Define a command class, register a command handler and define a method to send the command Example:

command :add_item, name: String do |cmd|
  cmd.follow(ItemAdded, item_id: SecureRandom.uuid, name: cmd.payload.name)
end

# The exmaple above will define a command class ‘AddItem` in the current namespace: AddItem = Message.define(’namespace.add_item’, payload_schema: { name: String })

# Optionally you can pass an explicit command type string:

command :add_item, 'todos.items.add', name: String do |cmd|

# It will also register the command handler: decide AddItem do |cmd|

cmd.follow(ItemAdded, item_id: SecureRandom.uuid, name: cmd.payload.name)

end

# And an :add_item method to send the command: def add_item(name:)

issue_command AddItem, name:

end

This method can be used on Decider instances:

aggregate.add_item(name: 'Buy milk')

Payload schema is a Plumb Hash schema. See: github.com/ismasan/plumb#typeshash

The helper method will instantiate an instance of the command class and validate its attributes with #valid? Only valid commands will be issued to the handler. The method returns the command instance. If #valid? is false, then the command was not issued. Example:

cmd = aggregate.add_item(name: 10)
cmd.valid? # => false
cmd.errors # => { name: 'must be a String' }

Parameters:

  • cmd_name (Symbol)

    example: :add_item

  • payload_schema (Hash)

    A Plumb Hash schema. example: { name: String }

  • block (Proc)

    The command handling code

Returns:

  • (Message)

    the command instance, which can be #valid? or not

Raises:

  • (ArgumentError)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/sourced/decider.rb', line 94

def command(*args, &block)
  raise ArgumentError, 'command block expects signature (state, command)' unless block.arity == 2

  message_type = nil
  cmd_name = nil
  payload_schema = {}
  segments = name.split('::').map(&:downcase)

  case args
    in [cmd_name]
      message_type = [*segments, cmd_name].join('.')
    in [cmd_name, Hash => payload_schema]
      message_type = [*segments, cmd_name].join('.')
    in [cmd_name, String => message_type, Hash => payload_schema]
    in [cmd_name, String => message_type]
  else
    raise ArgumentError, "Invalid arguments for #{self}.command"
  end

  klass_name = cmd_name.to_s.split('_').map(&:capitalize).join
  cmd_class = Command.define(message_type, payload_schema:)
  const_set(klass_name, cmd_class)
  decide cmd_class, &block
  define_method(cmd_name) do |**payload|
    issue_command cmd_class, payload
  end
end

.decide(cmd_type, &block) ⇒ Object



47
48
49
50
# File 'lib/sourced/decider.rb', line 47

def decide(cmd_type, &block)
  handled_commands << cmd_type
  define_method(Sourced.message_method_name(PREFIX, cmd_type.name), &block)
end

.handle_command(cmd) ⇒ Object

The Decider interface

Parameters:



31
32
33
# File 'lib/sourced/decider.rb', line 31

def handle_command(cmd)
  load(cmd.stream_id).handle_command(cmd)
end

.handle_events(events) ⇒ Object

The Reactor interface

Parameters:



25
26
27
# File 'lib/sourced/decider.rb', line 25

def handle_events(events)
  load(events.first.stream_id).handle_events(events)
end

.handled_commandsObject



43
44
45
# File 'lib/sourced/decider.rb', line 43

def handled_commands
  @handled_commands ||= []
end

.handled_eventsObject

Register as a Reactor



21
# File 'lib/sourced/decider.rb', line 21

def handled_events = self.handled_events_for_react

.inherited(subclass) ⇒ Object



13
14
15
16
17
18
# File 'lib/sourced/decider.rb', line 13

def inherited(subclass)
  super
  handled_commands.each do |cmd_type|
    subclass.handled_commands << cmd_type
  end
end

.load(stream_id, upto: nil) ⇒ Decider

Load a Decider from event history

Parameters:

  • stream_id (String)

    the stream id

Returns:



39
40
41
# File 'lib/sourced/decider.rb', line 39

def load(stream_id, upto: nil)
  new(stream_id).load(upto:)
end

Instance Method Details

#==(other) ⇒ Object



139
140
141
# File 'lib/sourced/decider.rb', line 139

def ==(other)
  other.is_a?(self.class) && id == other.id && seq == other.seq
end

#apply(event_class, payload = {}) ⇒ Object



173
174
175
176
177
178
179
180
181
182
# File 'lib/sourced/decider.rb', line 173

def apply(event_class, payload = {})
  evt = __current_command.follow_with_attributes(
    event_class, 
    attrs: { seq: __next_sequence }, 
    metadata: { producer: self.class.consumer_info.group_id },
    payload:
  )
  uncommitted_events << evt
  evolve(state, [evt])
end

#catch_upObject



156
157
158
159
160
# File 'lib/sourced/decider.rb', line 156

def catch_up
  seq_was = seq
  load(after: seq_was)
  [seq_was, seq]
end

#commit {|output_events| ... } ⇒ Object

Yields:

  • (output_events)


184
185
186
187
188
189
# File 'lib/sourced/decider.rb', line 184

def commit(&)
  output_events = uncommitted_events.slice(0..-1)
  yield output_events
  @uncommitted_events = []
  output_events
end

#decide(command) ⇒ Object



166
167
168
169
170
171
# File 'lib/sourced/decider.rb', line 166

def decide(command)
  command = __set_current_command(command)
  send(Sourced.message_method_name(PREFIX, command.class.name), state, command)
  @__current_command = nil
  [state, uncommitted_events]
end

#events(upto: seq) ⇒ Object



162
163
164
# File 'lib/sourced/decider.rb', line 162

def events(upto: seq)
  backend.read_event_stream(id, upto:)
end

#handle_command(command) ⇒ Object



201
202
203
204
205
206
207
208
# File 'lib/sourced/decider.rb', line 201

def handle_command(command)
  # TODO: this might raise an exception from a worker
  # Think what to do with invalid commands here
  raise "invalid command #{command.inspect} #{command.errors.inspect}" unless command.valid?
  logger.info "#{self.inspect} Handling #{command.type}"
  decide(command)
  save
end

#handle_events(events) ⇒ Object

TODO: idempotent event and command handling Reactor interface Handle events, return new commands Workers will handle route these commands to their target Deciders



215
216
217
# File 'lib/sourced/decider.rb', line 215

def handle_events(events)
  react(events)
end

#init_state(id) ⇒ Object



143
144
145
# File 'lib/sourced/decider.rb', line 143

def init_state(id)
  nil
end

#inspectObject



135
136
137
# File 'lib/sourced/decider.rb', line 135

def inspect
  %(<#{self.class} id:#{id} seq:#{seq}>)
end

#load(after: nil, upto: nil) ⇒ Object



147
148
149
150
151
152
153
154
# File 'lib/sourced/decider.rb', line 147

def load(after: nil, upto: nil)
  events = backend.read_event_stream(id, after:, upto:)
  if events.any?
    @seq = events.last.seq 
    evolve(state, events)
  end
  self
end