Class: RightScale::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/right_agent/dispatcher.rb

Overview

Dispatching of payload to specified actor

Defined Under Namespace

Classes: Dispatched

Constant Summary collapse

RESPONSE_QUEUE =

Response queue name

"response"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent) ⇒ Dispatcher

Initialize dispatcher

Parameters

agent(Agent)

Agent using this dispatcher; uses its identity, broker, registry, and following options:

:dup_check(Boolean)

Whether to check for and reject duplicate requests, e.g., due to retries,

  but only for requests that are dispatched from non-shared queues
:secure(Boolean):: true indicates to use Security features of RabbitMQ to restrict agents to themselves
:single_threaded(Boolean):: true indicates to run all operations in one thread; false indicates
  to do requested work on event machine defer thread and all else, such as pings on main thread
:threadpool_size(Integer):: Number of threads in event machine thread pool


129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/right_agent/dispatcher.rb', line 129

def initialize(agent)
  @agent = agent
  @broker = @agent.broker
  @registry = @agent.registry
  @identity = @agent.identity
  options = @agent.options
  @secure = options[:secure]
  @single_threaded = options[:single_threaded]
  @dup_check = options[:dup_check]
  @pending_dispatches = 0
  @em = EM
  @em.threadpool_size = (options[:threadpool_size] || 20).to_i
  reset_stats

  # Only access following from primary thread
  @dispatched = Dispatched.new if @dup_check
end

Instance Attribute Details

#brokerObject (readonly)

(RightAMQP::HABrokerClient) High availability AMQP broker client



114
115
116
# File 'lib/right_agent/dispatcher.rb', line 114

def broker
  @broker
end

#emObject

(EM) Event machine class (exposed for unit tests)



117
118
119
# File 'lib/right_agent/dispatcher.rb', line 117

def em
  @em
end

#identityObject (readonly)

(String) Identity of associated agent



111
112
113
# File 'lib/right_agent/dispatcher.rb', line 111

def identity
  @identity
end

#registryObject (readonly)

(ActorRegistry) Registry for actors



108
109
110
# File 'lib/right_agent/dispatcher.rb', line 108

def registry
  @registry
end

Instance Method Details

#dispatch(request, shared = false) ⇒ Object

Dispatch request to appropriate actor for servicing Handle returning of result to requester including logging any exceptions Reject requests whose TTL has expired or that are duplicates of work already dispatched but do not do duplicate checking if being dispatched from a shared queue Work is done in background defer thread if single threaded option is false

Parameters

request(Request|Push)

Packet containing request

shared(Boolean)

Whether being dispatched from a shared queue

Return

r(Result)

Result from dispatched request, nil if not dispatched because dup or stale



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/right_agent/dispatcher.rb', line 159

def dispatch(request, shared = false)

  # Determine which actor this request is for
  prefix, method = request.type.split('/')[1..-1]
  method ||= :index
  actor = @registry.actor_for(prefix)
  token = request.token
  received_at = @requests.update(method, (token if request.kind_of?(Request)))
  if actor.nil?
    Log.error("No actor for dispatching request <#{request.token}> of type #{request.type}")
    return nil
  end

  # Reject this request if its TTL has expired
  if (expires_at = request.expires_at) && expires_at > 0 && received_at.to_i >= expires_at
    @rejects.update("expired (#{method})")
    Log.info("REJECT EXPIRED <#{token}> from #{request.from} TTL #{RightSupport::Stats.elapsed(received_at.to_i - expires_at)} ago")
    if request.is_a?(Request)
      # For agents that do not know about non-delivery, use error result
      non_delivery = if request.recv_version < 13
        OperationResult.error("Could not deliver request (#{OperationResult::TTL_EXPIRATION})")
      else
        OperationResult.non_delivery(OperationResult::TTL_EXPIRATION)
      end
      result = Result.new(token, request.reply_to, non_delivery, @identity, request.from, request.tries, request.persistent)
      exchange = {:type => :queue, :name => RESPONSE_QUEUE, :options => {:durable => true, :no_declare => @secure}}
      @broker.publish(exchange, result, :persistent => true, :mandatory => true)
    end
    return nil
  end

  # Reject this request if it is a duplicate
  if @dup_check && !shared && request.kind_of?(Request)
    if @dispatched.fetch(token)
      @rejects.update("duplicate (#{method})")
      Log.info("REJECT DUP <#{token}> of self")
      return nil
    end
    request.tries.each do |t|
      if @dispatched.fetch(t)
        @rejects.update("retry duplicate (#{method})")
        Log.info("REJECT RETRY DUP <#{token}> of <#{t}>")
        return nil
      end
    end
  end

  # Proc for performing request in actor
  operation = lambda do
    begin
      @pending_dispatches += 1
      @last_request_dispatch_time = received_at.to_i
      @dispatched.store(token) if @dup_check && !shared && request.kind_of?(Request) && token
      actor.__send__(method, request.payload)
    rescue Exception => e
      @pending_dispatches = [@pending_dispatches - 1, 0].max
      handle_exception(actor, method, request, e)
    end
  end
  
  # Proc for sending response
  callback = lambda do |r|
    begin
      @pending_dispatches = [@pending_dispatches - 1, 0].max
      if request.kind_of?(Request)
        duration = @requests.finish(received_at, token)
        r = Result.new(token, request.reply_to, r, @identity, request.from, request.tries, request.persistent, duration)
        exchange = {:type => :queue, :name => RESPONSE_QUEUE, :options => {:durable => true, :no_declare => @secure}}
        @broker.publish(exchange, r, :persistent => true, :mandatory => true, :log_filter => [:tries, :persistent, :duration])
      end
    rescue RightAMQP::HABrokerClient::NoConnectedBrokers => e
      Log.error("Failed to publish result of dispatched request #{request.trace}", e)
    rescue Exception => e
      Log.error("Failed to publish result of dispatched request #{request.trace}", e, :trace)
      @exceptions.track("publish response", e)
    end
    r # For unit tests
  end

  # Process request and send response, if any
  if @single_threaded
    @em.next_tick { callback.call(operation.call) }
  else
    @em.defer(operation, callback)
  end
end

#dispatch_ageObject

Determine age of youngest request dispatch

Return

(Integer|nil)

Age in seconds of youngest dispatch, or nil if none



250
251
252
# File 'lib/right_agent/dispatcher.rb', line 250

def dispatch_age
  age = Time.now.to_i - @last_request_dispatch_time if @last_request_dispatch_time && @pending_dispatches > 0
end

#stats(reset = false) ⇒ Object

Get dispatcher statistics

Parameters

reset(Boolean)

Whether to reset the statistics after getting the current ones

Return

stats(Hash)

Current statistics:

“cached”(Hash|nil)

Number of dispatched requests cached and age of youngest and oldest,

  or nil if empty
"exceptions"(Hash|nil):: Exceptions raised per category, or nil if none
  "total"(Integer):: Total for category
  "recent"(Array):: Most recent as a hash of "count", "type", "message", "when", and "where"
"rejects"(Hash|nil):: Request reject activity stats with keys "total", "percent", "last", and "rate"
"pending"(Hash|nil):: Pending request "total" and "youngest age", or nil if none
  with percentage breakdown per reason ("duplicate (<method>)", "retry duplicate (<method>)", or
  "stale (<method>)"), or nil if none
"requests"(Hash|nil):: Request activity stats with keys "total", "percent", "last", and "rate"
  with percentage breakdown per request type, or nil if none
"response time"(Float):: Average number of seconds to respond to a request recently


273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/right_agent/dispatcher.rb', line 273

def stats(reset = false)
  pending = if @pending_dispatches > 0
    {
      "total" => @pending_dispatches,
      "youngest age" => dispatch_age
    }
  end
  stats = {
    "cached"        => (@dispatched.stats if @dup_check),
    "exceptions"    => @exceptions.stats,
    "pending"       => pending,
    "rejects"       => @rejects.all,
    "requests"      => @requests.all,
    "response time" => @requests.avg_duration
  }
  reset_stats if reset
  stats
end