Class: Dynflow::Dispatcher::ClientDispatcher

Inherits:
Abstract
  • Object
show all
Defined in:
lib/dynflow/dispatcher/client_dispatcher.rb

Defined Under Namespace

Modules: TrackedRequest Classes: PingCache

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Abstract

#connector, #respond

Methods inherited from Actor

#behaviour_definition, #finish_termination, #terminating?

Methods included from MethodicActor

#on_message

Methods included from Actor::LogWithFullBacktrace

#log

Constructor Details

#initialize(world, ping_cache_age) ⇒ ClientDispatcher

Returns a new instance of ClientDispatcher.



107
108
109
110
111
112
113
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 107

def initialize(world, ping_cache_age)
  @world            = Type! world, World
  @last_id_suffix   = 0
  @tracked_requests = {}
  @terminated       = nil
  @ping_cache       = PingCache.new world, ping_cache_age
end

Instance Attribute Details

#ping_cacheObject (readonly)

Returns the value of attribute ping_cache.



106
107
108
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 106

def ping_cache
  @ping_cache
end

Instance Method Details

#add_ping_cache_record(id) ⇒ Object

Records when was the world with provided id last seen using a PingCache

Parameters:

  • id (String)

    Id of the world

See Also:



186
187
188
189
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 186

def add_ping_cache_record(id)
  log Logger::DEBUG, "adding ping cache record for #{id}"
  @ping_cache.add_record id
end

#dispatch_request(request, client_world_id, request_id) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 134

def dispatch_request(request, client_world_id, request_id)
  ignore_unknown = false
  executor_id = match request,
                      (on ~Execution | ~Planning do |execution|
                         AnyExecutor
                       end),
                      (on ~Event do |event|
                         ignore_unknown = event.optional
                         find_executor(event.execution_plan_id)
                       end),
                      (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _|
                         receiver_id
                       end)
  envelope = Envelope[request_id, client_world_id, executor_id, request]
  if Dispatcher::UnknownWorld === envelope.receiver_id
    raise Dynflow::Error, "Could not find an executor for #{envelope}" unless ignore_unknown

    message = "Could not find an executor for optional #{envelope}, discarding."
    log(Logger::DEBUG, message)
    return respond(envelope, Failed[message])
  end
  connector.send(envelope).value!
rescue => e
  log(Logger::ERROR, e)
  respond(envelope, Failed[e.message]) if envelope
end

#dispatch_response(envelope) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 161

def dispatch_response(envelope)
  return unless @tracked_requests.key?(envelope.request_id)
  match envelope.message,
        (on ~Accepted do
           @tracked_requests[envelope.request_id].accept!
         end),
        (on ~Failed do |msg|
           resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error))
         end),
        (on Done do
           resolve_tracked_request(envelope.request_id)
         end),
        (on Pong do
           add_ping_cache_record(envelope.sender_id)
           resolve_tracked_request(envelope.request_id)
         end),
        (on ExecutionStatus.(~any) do |steps|
           @tracked_requests.delete(envelope.request_id).success! steps
         end)
end

#publish_request(future, request, timeout) ⇒ Object



115
116
117
118
119
120
121
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 115

def publish_request(future, request, timeout)
  with_ping_request_caching(request, future) do
    track_request(future, request, timeout) do |tracked_request|
      dispatch_request(request, @world.id, tracked_request.id)
    end
  end
end

#start_termination(*args) ⇒ Object



127
128
129
130
131
132
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 127

def start_termination(*args)
  super
  @tracked_requests.values.each { |tracked_request| tracked_request.fail!(Dynflow::Error.new('Dispatcher terminated')) }
  @tracked_requests.clear
  finish_termination
end

#timeout(request_id) ⇒ Object



123
124
125
# File 'lib/dynflow/dispatcher/client_dispatcher.rb', line 123

def timeout(request_id)
  resolve_tracked_request(request_id, Dynflow::Error.new("Request timeout"))
end