Class: ServiceWave

Inherits:
Object
  • Object
show all
Defined in:
app/models/service_wave.rb

Overview

ServiceWave is basically responsible for multi-threaded execution of a list of services, all in the same priority. Generally it’s only called by Collection, nobody else needs it directly.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(service_objects, priority_level = nil, config = UmlautController.umlaut_config) ⇒ ServiceWave

Priority level is purely information, used for debug output.



13
14
15
16
17
18
19
20
21
22
23
# File 'app/models/service_wave.rb', line 13

def initialize(service_objects, priority_level = nil, config = UmlautController.umlaut_config)
  @services = service_objects
  @priority_level = priority_level
  @config = config
  @log_timing = config.lookup!("log_service_timing", true)

  # Don't forward exceptions, that'll interrupt other service processing.
  # Catch the exception, record it in the dispatch table, done. May want
  # to set this to true for debugging/development, but NOT for production.
  @forward_exceptions = false
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



10
11
12
# File 'app/models/service_wave.rb', line 10

def config
  @config
end

#priority_levelObject

Returns the value of attribute priority_level.



9
10
11
# File 'app/models/service_wave.rb', line 9

def priority_level
  @priority_level
end

#servicesObject

Returns the value of attribute services.



8
9
10
# File 'app/models/service_wave.rb', line 8

def services
  @services
end

Instance Method Details

#forward_exceptions=(f) ⇒ Object



160
161
162
# File 'app/models/service_wave.rb', line 160

def forward_exceptions=(f)
  @foward_excpetions = f    
end

#forward_exceptions?Boolean

Returns:

  • (Boolean)


157
158
159
# File 'app/models/service_wave.rb', line 157

def forward_exceptions?
  return @forward_exceptions
end

#handle(request, session_id) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'app/models/service_wave.rb', line 31

def handle(request, session_id)
  return if (@services.nil? || @services.empty?)

  bundle_start = Time.now
  Rails.logger.info(TermColor.color("Umlaut: Launching service wave #{@priority_level} #{'(non-threaded)' unless config.lookup!("threaded_service_wave", true) }", :yellow) + ", request #{request.id}") if @log_timing

  
  
  threads = []
  some_service_executed = false
  @services.each do | service |
    some_service_executed = true
    local_request = nil
    
    service_start = Time.now          

    if config.lookup!("threaded_service_wave", true)

    
      threads << Thread.new(request.id, service.clone) do | request_id, local_service |
        # Deal with ruby's brain dead thread scheduling by setting
        # bg threads to a lower priority so they don't interfere with fg
        # threads.
        Thread.current.priority = -1

        # Save some things in thread local hash useful for debugging
        Thread.current[:debug_name] = local_service.class.name
        Thread.current[:service] = service

        # Tell our AR extension not to allow implicit checkouts
        ActiveRecord::Base.forbid_implicit_checkout_for_thread! if ActiveRecord::Base.respond_to?("forbid_implicit_checkout_for_thread!")
        
        begin
          local_request = Request.connection_pool.with_connection do
            # We are attempting to pre-load all relationships, both for efficiency,
            # and so our thread can use them all without needing to checkout
            # an ActiveRecord connection. 
            req = Request.includes({:referent => :referent_values}, :service_responses, :dispatched_services).find(request_id)

            # It turns out even though referent.referent_values is loaded from the db, on first
            # access Rails will still access #connection, triggering a checkout. We force
            # that to happen here, in our with_connection, so it won't happen later. 
            #
            # Yeah, this is a hacky mess, ActiveRecord isn't happy using it the way we are. 
            # Should we just surround all of handle_wrapper in a with_connection checkout?
            # Maybe. But it would require more connections in the pool to have those
            # longer checkouts. 
            req.referent.referent_values
            req.service_responses
            req.dispatched_services

            req
          end

            

          if prepare_dispatch!(local_request, local_service)
            local_service.handle_wrapper(local_request)
          else
            Rails.logger.info("NOT launching service #{local_service.service_id},  level #{@priority_level}, request #{local_request.id}: not in runnable state") if @log_timing
          end
          
         
        rescue StandardError => e
          # We may not be able to access ActiveRecord because it may
          # have been an AR connection error, perhaps out of connections
          # in the pool. So log and record in non-AR ways. 
          # the code waiting on our thread will see exception
          # reported in Thread local var, and log it AR if possible. 
          
          
          # Log it too, although experience shows it may never make it to the 
          # log for mysterious reasons. 
          log_msg = TermColor.color("Umlaut: Threaded service raised exception.", :red, true) + " Service: #{service.service_id}, #{e.class} #{e.message}. Backtrace:\n  #{clean_backtrace(e).join("\n  ")}"
          Rails.logger.error(log_msg)
          
          # And stick it in a thread variable too
          Thread.current[:exception] = e    

          # And try to re-raise if it's one we really don't want to swallow. 
          # Sorry, a mess. 
          raise e if defined?(VCR::Errors::UnhandledHTTPRequestError) && e.kind_of?(VCR::Errors::UnhandledHTTPRequestError)
        ensure
          Rails.logger.info(TermColor.color("Umlaut: Completed service #{local_service.service_id}", :yellow)+ ",  level #{@priority_level}, request #{local_request && local_request.id}: in #{Time.now - service_start}.") if @log_timing
        end
      end
    else # not threaded
      begin
        if prepare_dispatch!(request, service)
            service.handle_wrapper(request)
        else
            Rails.logger.info("NOT launching service #{service.service_id},  level #{@priority_level}, request #{request.id}: not in runnable state") if @log_timing
        end
      ensure
        Rails.logger.info(TermColor.color("Umlaut: Completed service #{service.service_id}", :yellow)+ ",  level #{@priority_level}, request #{request && request.id}: in #{Time.now - service_start}.") if @log_timing
      end
    end
    
  end

  # Wait for all the threads to complete, if any. 
  threads.each do |aThread|
    aThread.join

    if aThread[:exception] 
      debugger if ENV["UMLAUT_AUTO_DEBUGGER"]
      begin
        request.dispatched(aThread[:service], DispatchedService::FailedFatal, aThread[:exception])
      rescue Exception => e
        debugger if ENV["UMLAUT_AUTO_DEBUGGER"]
        raise e
      end
    end

    # Okay, raise if exception, if desired.
    if ( aThread[:exception] && self.forward_exceptions? )        
      raise aThread[:exception]
    end
  end

  threads.clear # more paranoia


  Rails.logger.info(TermColor.color("Umlaut: Completed service wave #{@priority_level}", :yellow) + ", request #{request.id}: in #{Time.now - bundle_start}") if some_service_executed && @log_timing
end

#prepare_dispatch!(request, service) ⇒ Object

Safe to call in a thread. Returns true or false depending on whether dispatch should proceed.



27
28
29
# File 'app/models/service_wave.rb', line 27

def prepare_dispatch!(request, service)            
  return request.register_in_progress(service)                    
end