Class: Sourced::Router
- Inherits:
-
Object
- Object
- Sourced::Router
- Includes:
- Singleton
- Defined in:
- lib/sourced/router.rb
Constant Summary collapse
- PID =
Process.pid
Instance Attribute Summary collapse
-
#async_reactors ⇒ Object
readonly
Returns the value of attribute async_reactors.
-
#backend ⇒ Object
readonly
Returns the value of attribute backend.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#sync_reactors ⇒ Object
readonly
Returns the value of attribute sync_reactors.
Class Method Summary collapse
- .async_reactors ⇒ Object
- .dispatch_next_command ⇒ Object
- .handle_and_ack_events_for_reactor(reactor, events) ⇒ Object
- .handle_command(command) ⇒ Object
- .handle_events(events) ⇒ Object
- .handle_next_event_for_reactor(reactor, process_name = nil) ⇒ Object
- .register ⇒ Object
Instance Method Summary collapse
- #dispatch_next_command ⇒ Object
-
#handle_and_ack_events_for_reactor(reactor, events) ⇒ Object
When in sync mode, we want both events and any resulting commands to be processed syncronously and in the same transaction as events are appended to store.
- #handle_command(command) ⇒ Object
- #handle_events(events) ⇒ Object
- #handle_next_event_for_reactor(reactor, process_name = nil) ⇒ Object
-
#initialize(backend: Sourced.config.backend, logger: Sourced.config.logger) ⇒ Router
constructor
A new instance of Router.
- #register(thing) ⇒ Object
Constructor Details
#initialize(backend: Sourced.config.backend, logger: Sourced.config.logger) ⇒ Router
Returns a new instance of Router.
45 46 47 48 49 50 51 |
# File 'lib/sourced/router.rb', line 45 def initialize(backend: Sourced.config.backend, logger: Sourced.config.logger) @backend = backend @logger = logger @decider_lookup = {} @sync_reactors = Set.new @async_reactors = Set.new end |
Instance Attribute Details
#async_reactors ⇒ Object (readonly)
Returns the value of attribute async_reactors.
43 44 45 |
# File 'lib/sourced/router.rb', line 43 def async_reactors @async_reactors end |
#backend ⇒ Object (readonly)
Returns the value of attribute backend.
43 44 45 |
# File 'lib/sourced/router.rb', line 43 def backend @backend end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
43 44 45 |
# File 'lib/sourced/router.rb', line 43 def logger @logger end |
#sync_reactors ⇒ Object (readonly)
Returns the value of attribute sync_reactors.
43 44 45 |
# File 'lib/sourced/router.rb', line 43 def sync_reactors @sync_reactors end |
Class Method Details
.async_reactors ⇒ Object
30 31 32 |
# File 'lib/sourced/router.rb', line 30 def async_reactors instance.async_reactors end |
.dispatch_next_command ⇒ Object
22 23 24 |
# File 'lib/sourced/router.rb', line 22 def dispatch_next_command instance.dispatch_next_command end |
.handle_and_ack_events_for_reactor(reactor, events) ⇒ Object
34 35 36 |
# File 'lib/sourced/router.rb', line 34 def handle_and_ack_events_for_reactor(reactor, events) instance.handle_and_ack_events_for_reactor(reactor, events) end |
.handle_command(command) ⇒ Object
18 19 20 |
# File 'lib/sourced/router.rb', line 18 def handle_command(command) instance.handle_command(command) end |
.handle_events(events) ⇒ Object
26 27 28 |
# File 'lib/sourced/router.rb', line 26 def handle_events(events) instance.handle_events(events) end |
.handle_next_event_for_reactor(reactor, process_name = nil) ⇒ Object
38 39 40 |
# File 'lib/sourced/router.rb', line 38 def handle_next_event_for_reactor(reactor, process_name = nil) instance.handle_next_event_for_reactor(reactor, process_name) end |
.register ⇒ Object
14 15 16 |
# File 'lib/sourced/router.rb', line 14 def register(...) instance.register(...) end |
Instance Method Details
#dispatch_next_command ⇒ Object
131 132 133 134 135 136 |
# File 'lib/sourced/router.rb', line 131 def dispatch_next_command backend.next_command do |cmd| # TODO: error handling handle_command(cmd) end end |
#handle_and_ack_events_for_reactor(reactor, events) ⇒ Object
When in sync mode, we want both events and any resulting commands to be processed syncronously and in the same transaction as events are appended to store. We could handle commands in threads or fibers, if they belong to different streams than the events, but we need to make sure to raise exceptions in the main thread. so that the transaction is rolled back.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/sourced/router.rb', line 112 def handle_and_ack_events_for_reactor(reactor, events) backend.ack_on(reactor.consumer_info.group_id, events.last.id) do commands = reactor.handle_events(events) if commands && commands.any? # TODO: Commands may or may not belong to he same stream as events # if they belong to the same stream, # hey need to be dispached in order to preserve per stream order # If they belong to different streams, they can be dispatched in parallel # or put in a command bus. # TODO2: we also need to handle exceptions here # TODO3: this is not tested commands.each do |cmd| log_event(' -> produced command', reactor, cmd) handle_command(cmd) end end end end |
#handle_command(command) ⇒ Object
69 70 71 72 |
# File 'lib/sourced/router.rb', line 69 def handle_command(command) decider = @decider_lookup.fetch(command.class) decider.handle_command(command) end |
#handle_events(events) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/sourced/router.rb', line 74 def handle_events(events) event_classes = events.map(&:class) reactors = sync_reactors.filter do |r| r.handled_events.intersect?(event_classes) end # TODO # Reactors can return commands to run next # I need to think about how to best to handle this safely # Also this could potential lead to infinite recursion! reactors.each do |r| handle_and_ack_events_for_reactor(r, events) end end |
#handle_next_event_for_reactor(reactor, process_name = nil) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/sourced/router.rb', line 88 def handle_next_event_for_reactor(reactor, process_name = nil) backend.reserve_next_for_reactor(reactor) do |event| # We're dealing with one event at a time now # So reactors should return a single command, or nothing log_event('handling event', reactor, event, process_name) commands = reactor.handle_events([event]) if commands.any? # This will run a new decider # which may be expensive, timeout, or raise an exception # TODO: handle decider errors backend.schedule_commands(commands) end event end end |
#register(thing) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/sourced/router.rb', line 53 def register(thing) if DeciderInterface === thing thing.handled_commands.each do |cmd_type| @decider_lookup[cmd_type] = thing end end return unless ReactorInterface === thing if thing.consumer_info.async @async_reactors << thing else @sync_reactors << thing end end |