Class: Sourced::Router

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/sourced/router.rb

Constant Summary collapse

PID =
Process.pid

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(backend: Sourced.config.backend, logger: Sourced.config.logger) ⇒ Router

Returns a new instance of Router.



45
46
47
48
49
50
51
# File 'lib/sourced/router.rb', line 45

def initialize(backend: Sourced.config.backend, logger: Sourced.config.logger)
  @backend = backend
  @logger = logger
  @decider_lookup = {}
  @sync_reactors = Set.new
  @async_reactors = Set.new
end

Instance Attribute Details

#async_reactorsObject (readonly)

Returns the value of attribute async_reactors.



43
44
45
# File 'lib/sourced/router.rb', line 43

def async_reactors
  @async_reactors
end

#backendObject (readonly)

Returns the value of attribute backend.



43
44
45
# File 'lib/sourced/router.rb', line 43

def backend
  @backend
end

#loggerObject (readonly)

Returns the value of attribute logger.



43
44
45
# File 'lib/sourced/router.rb', line 43

def logger
  @logger
end

#sync_reactorsObject (readonly)

Returns the value of attribute sync_reactors.



43
44
45
# File 'lib/sourced/router.rb', line 43

def sync_reactors
  @sync_reactors
end

Class Method Details

.async_reactorsObject



30
31
32
# File 'lib/sourced/router.rb', line 30

def async_reactors
  instance.async_reactors
end

.dispatch_next_commandObject



22
23
24
# File 'lib/sourced/router.rb', line 22

def dispatch_next_command
  instance.dispatch_next_command
end

.handle_and_ack_events_for_reactor(reactor, events) ⇒ Object



34
35
36
# File 'lib/sourced/router.rb', line 34

def handle_and_ack_events_for_reactor(reactor, events)
  instance.handle_and_ack_events_for_reactor(reactor, events)
end

.handle_command(command) ⇒ Object



18
19
20
# File 'lib/sourced/router.rb', line 18

def handle_command(command)
  instance.handle_command(command)
end

.handle_events(events) ⇒ Object



26
27
28
# File 'lib/sourced/router.rb', line 26

def handle_events(events)
  instance.handle_events(events)
end

.handle_next_event_for_reactor(reactor, process_name = nil) ⇒ Object



38
39
40
# File 'lib/sourced/router.rb', line 38

def handle_next_event_for_reactor(reactor, process_name = nil)
  instance.handle_next_event_for_reactor(reactor, process_name)
end

.registerObject



14
15
16
# File 'lib/sourced/router.rb', line 14

def register(...)
  instance.register(...)
end

Instance Method Details

#dispatch_next_commandObject



131
132
133
134
135
136
# File 'lib/sourced/router.rb', line 131

def dispatch_next_command
  backend.next_command do |cmd|
    #  TODO: error handling
    handle_command(cmd)
  end
end

#handle_and_ack_events_for_reactor(reactor, events) ⇒ Object

When in sync mode, we want both events and any resulting commands to be processed syncronously and in the same transaction as events are appended to store. We could handle commands in threads or fibers, if they belong to different streams than the events, but we need to make sure to raise exceptions in the main thread. so that the transaction is rolled back.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/sourced/router.rb', line 112

def handle_and_ack_events_for_reactor(reactor, events)
  backend.ack_on(reactor.consumer_info.group_id, events.last.id) do
    commands = reactor.handle_events(events)
    if commands && commands.any?
      # TODO: Commands may or may not belong to he same stream as events
      # if they belong to the same stream,
      # hey need to be dispached in order to preserve per stream order
      # If they belong to different streams, they can be dispatched in parallel
      # or put in a command bus.
      # TODO2: we also need to handle exceptions here
      # TODO3: this is not tested
      commands.each do |cmd|
        log_event(' -> produced command', reactor, cmd)
        handle_command(cmd)
      end
    end
  end
end

#handle_command(command) ⇒ Object



69
70
71
72
# File 'lib/sourced/router.rb', line 69

def handle_command(command)
  decider = @decider_lookup.fetch(command.class)
  decider.handle_command(command)
end

#handle_events(events) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/sourced/router.rb', line 74

def handle_events(events)
  event_classes = events.map(&:class)
  reactors = sync_reactors.filter do |r|
    r.handled_events.intersect?(event_classes)
  end
  # TODO
  # Reactors can return commands to run next
  # I need to think about how to best to handle this safely
  # Also this could potential lead to infinite recursion!
  reactors.each do |r|
    handle_and_ack_events_for_reactor(r, events)
  end
end

#handle_next_event_for_reactor(reactor, process_name = nil) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/sourced/router.rb', line 88

def handle_next_event_for_reactor(reactor, process_name = nil)
  backend.reserve_next_for_reactor(reactor) do |event|
    # We're dealing with one event at a time now
    # So reactors should return a single command, or nothing
    log_event('handling event', reactor, event, process_name)
    commands = reactor.handle_events([event])
    if commands.any?
      # This will run a new decider
      # which may be expensive, timeout, or raise an exception
      # TODO: handle decider errors
      backend.schedule_commands(commands)
    end

    event
  end
end

#register(thing) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/sourced/router.rb', line 53

def register(thing)
  if DeciderInterface === thing
    thing.handled_commands.each do |cmd_type|
      @decider_lookup[cmd_type] = thing
    end
  end

  return unless ReactorInterface === thing

  if thing.consumer_info.async
    @async_reactors << thing
  else
    @sync_reactors << thing
  end
end