Class: BundleRequests::Consumer
- Inherits:
-
Object
- Object
- BundleRequests::Consumer
- Defined in:
- lib/bundle_requests/consumer.rb
Instance Method Summary collapse
- #call_bundle_api(rack_input, default_env = {}) ⇒ Object
-
#consumer_code ⇒ Object
-
$waiting_threads.
-
- #distribute_result_and_wakeup_those_threads(threads, result) ⇒ Object
- #gather_all_requests(threads) ⇒ Object
- #generate_config_hash(options) ⇒ Object
- #get_configuration ⇒ Object
- #get_waiting_threads ⇒ Object
-
#initialize(app, config) ⇒ Consumer
constructor
A new instance of Consumer.
- #pop_some_waiting_threads ⇒ Object
Constructor Details
#initialize(app, config) ⇒ Consumer
Returns a new instance of Consumer.
3 4 5 6 7 8 |
# File 'lib/bundle_requests/consumer.rb', line 3 def initialize(app , config) @app = app $waiting_threads = Queue.new generate_config_hash(config) Thread.new{consumer_code} end |
Instance Method Details
#call_bundle_api(rack_input, default_env = {}) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/bundle_requests/consumer.rb', line 62 def call_bundle_api(rack_input,default_env={}) env = default_env # if this doesnt work assign myenv to some env of any threads env['PATH_INFO'] = $configuration['bundle_api'] env['QUERY_STRING'] = '' env['REQUEST_METHOD'] = 'POST' env['CONTENT_LENGTH'] = {'requests' => rack_input}.to_json.length env['rack.input'] = StringIO.new({'requests' => rack_input}.to_json) request = Rack::Request.new(env) Rails.logger.info("new Environment is #{env}") Rails.logger.info("New request content are #{request}") result = @app.call(env) Rails.logger.info("Result are #{result}") result end |
#consumer_code ⇒ Object
-
$waiting_threads
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/bundle_requests/consumer.rb', line 23 def consumer_code while true c = $waiting_threads.length if c < $configuration["max_waiting_thread"] puts "#{c} threads are waiting so sleeping" sleep($configuration["wait_time"]) next if $waiting_threads.length == 0 end puts "Started request processing----------------------------------" threads = pop_some_waiting_threads rack_input = gather_all_requests(threads) result = call_bundle_api(rack_input,threads[0]['request']) # may through exception if 0 threads are present remove it afterwards distribute_result_and_wakeup_those_threads(threads, result) puts "Completed proccessing requests------------------------------" end end |
#distribute_result_and_wakeup_those_threads(threads, result) ⇒ Object
77 78 79 80 81 82 83 84 |
# File 'lib/bundle_requests/consumer.rb', line 77 def distribute_result_and_wakeup_those_threads(threads, result) for index in 0...threads.length t = threads[index] # hardcoded response for development. Replace it with result[index] t["response"] = [200,{"Content-Type" => "application/json"},["Status Ok"]] t.wakeup end end |
#gather_all_requests(threads) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/bundle_requests/consumer.rb', line 50 def gather_all_requests(threads) rack_input = [] threads.each do |t| e = t['request'] req = Rack::Request.new(e) Rails.logger.debug req.inspect rack_input << JSON.parse(req.body.read) end Rails.logger.info rack_input rack_input end |
#generate_config_hash(options) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/bundle_requests/consumer.rb', line 86 def generate_config_hash() if $configuration.nil? config = { "incoming_request" => "/api", "bundle_api" => "/bundle_api", "wait_time" => 10, "thread_wait_after_closing_entrance" => 2, "max_waiting_thread" => 16 } .each do |key,value| if !value.nil? config[key] = value end end $configuration = config end end |
#get_configuration ⇒ Object
14 15 16 |
# File 'lib/bundle_requests/consumer.rb', line 14 def get_configuration $configuration end |
#get_waiting_threads ⇒ Object
10 11 12 |
# File 'lib/bundle_requests/consumer.rb', line 10 def get_waiting_threads $waiting_threads end |
#pop_some_waiting_threads ⇒ Object
40 41 42 43 44 45 46 47 48 |
# File 'lib/bundle_requests/consumer.rb', line 40 def pop_some_waiting_threads threads = [] $waiting_threads.length.times do t = $waiting_threads.pop threads << t end Rails.logger.info("Currently proccessing #{threads.length} threads") threads end |