Class: Dynflow::Dispatcher::ClientDispatcher
Defined Under Namespace
Modules: TrackedRequest Classes: PingCache
Instance Attribute Summary collapse
-
#ping_cache ⇒ Object
readonly
Returns the value of attribute ping_cache.
Instance Method Summary collapse
-
#add_ping_cache_record(id) ⇒ Object
Records when was the world with provided id last seen using a PingCache.
- #dispatch_request(request, client_world_id, request_id) ⇒ Object
- #dispatch_response(envelope) ⇒ Object
-
#initialize(world, ping_cache_age) ⇒ ClientDispatcher
constructor
A new instance of ClientDispatcher.
- #publish_request(future, request, timeout) ⇒ Object
- #start_termination(*args) ⇒ Object
- #timeout(request_id) ⇒ Object
Methods inherited from Abstract
Methods inherited from Actor
#behaviour_definition, #finish_termination, #terminating?
Methods included from MethodicActor
Methods included from Actor::LogWithFullBacktrace
Constructor Details
#initialize(world, ping_cache_age) ⇒ ClientDispatcher
Returns a new instance of ClientDispatcher.
107 108 109 110 111 112 113 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 107 def initialize(world, ping_cache_age) @world = Type! world, World @last_id_suffix = 0 @tracked_requests = {} @terminated = nil @ping_cache = PingCache.new world, ping_cache_age end |
Instance Attribute Details
#ping_cache ⇒ Object (readonly)
Returns the value of attribute ping_cache.
106 107 108 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 106 def ping_cache @ping_cache end |
Instance Method Details
#add_ping_cache_record(id) ⇒ Object
Records when was the world with provided id last seen using a PingCache
186 187 188 189 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 186 def add_ping_cache_record(id) log Logger::DEBUG, "adding ping cache record for #{id}" @ping_cache.add_record id end |
#dispatch_request(request, client_world_id, request_id) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 134 def dispatch_request(request, client_world_id, request_id) ignore_unknown = false executor_id = match request, (on ~Execution | ~Planning do |execution| AnyExecutor end), (on ~Event do |event| ignore_unknown = event.optional find_executor(event.execution_plan_id) end), (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _| receiver_id end) envelope = Envelope[request_id, client_world_id, executor_id, request] if Dispatcher::UnknownWorld === envelope.receiver_id raise Dynflow::Error, "Could not find an executor for #{envelope}" unless ignore_unknown = "Could not find an executor for optional #{envelope}, discarding." log(Logger::DEBUG, ) return respond(envelope, Failed[]) end connector.send(envelope).value! rescue => e log(Logger::ERROR, e) respond(envelope, Failed[e.]) if envelope end |
#dispatch_response(envelope) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 161 def dispatch_response(envelope) return unless @tracked_requests.key?(envelope.request_id) match envelope., (on ~Accepted do @tracked_requests[envelope.request_id].accept! end), (on ~Failed do |msg| resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error)) end), (on Done do resolve_tracked_request(envelope.request_id) end), (on Pong do add_ping_cache_record(envelope.sender_id) resolve_tracked_request(envelope.request_id) end), (on ExecutionStatus.(~any) do |steps| @tracked_requests.delete(envelope.request_id).success! steps end) end |
#publish_request(future, request, timeout) ⇒ Object
115 116 117 118 119 120 121 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 115 def publish_request(future, request, timeout) with_ping_request_caching(request, future) do track_request(future, request, timeout) do |tracked_request| dispatch_request(request, @world.id, tracked_request.id) end end end |
#start_termination(*args) ⇒ Object
127 128 129 130 131 132 |
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 127 def start_termination(*args) super @tracked_requests.values.each { |tracked_request| tracked_request.fail!(Dynflow::Error.new('Dispatcher terminated')) } @tracked_requests.clear finish_termination end |