Class: BundleRequests::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/bundle_requests/consumer.rb

Instance Method Summary collapse

Constructor Details

#initialize(app, config) ⇒ Consumer

Returns a new instance of Consumer.



3
4
5
6
7
8
# File 'lib/bundle_requests/consumer.rb', line 3

def initialize(app , config)
  @app = app
  $waiting_threads = Queue.new
  generate_config_hash(config)
  Thread.new{consumer_code}
end

Instance Method Details

#call_bundle_api(rack_input, default_env = {}) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/bundle_requests/consumer.rb', line 62

def call_bundle_api(rack_input,default_env={})
  env = default_env             # if this doesnt work assign myenv to some env of any threads
  env['PATH_INFO'] = $configuration['bundle_api']
  env['QUERY_STRING'] = ''
  env['REQUEST_METHOD'] = 'POST'
  env['CONTENT_LENGTH'] = {'requests' => rack_input}.to_json.length
  env['rack.input'] = StringIO.new({'requests' => rack_input}.to_json)
  request = Rack::Request.new(env)
  Rails.logger.info("new Environment is #{env}")
  Rails.logger.info("New request content are #{request}")
  result = @app.call(env)
  Rails.logger.info("Result are #{result}")
  result
end

#consumer_codeObject

  • $waiting_threads



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/bundle_requests/consumer.rb', line 23

def consumer_code
  while true
    c = $waiting_threads.length
    if c < $configuration["max_waiting_thread"]
      puts "#{c} threads are waiting so sleeping"
      sleep($configuration["wait_time"])
      next if $waiting_threads.length == 0
    end
    puts "Started request processing----------------------------------"
    threads = pop_some_waiting_threads
    rack_input = gather_all_requests(threads)
    result = call_bundle_api(rack_input,threads[0]['request'])      # may through exception if 0 threads are present remove it afterwards
    distribute_result_and_wakeup_those_threads(threads, result)
    puts "Completed proccessing requests------------------------------"
  end      
end

#distribute_result_and_wakeup_those_threads(threads, result) ⇒ Object



77
78
79
80
81
82
83
84
# File 'lib/bundle_requests/consumer.rb', line 77

def distribute_result_and_wakeup_those_threads(threads, result)
  for index in 0...threads.length
    t = threads[index]
    # hardcoded response for development. Replace it with result[index]
    t["response"] = [200,{"Content-Type" => "application/json"},["Status Ok"]]
    t.wakeup
  end
end

#gather_all_requests(threads) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/bundle_requests/consumer.rb', line 50

def gather_all_requests(threads)
  rack_input = []
  threads.each do |t|
    e = t['request']
    req = Rack::Request.new(e)
    Rails.logger.debug req.inspect
    rack_input << JSON.parse(req.body.read)
  end
  Rails.logger.info rack_input
  rack_input
end

#generate_config_hash(options) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/bundle_requests/consumer.rb', line 86

def generate_config_hash(options)
  if $configuration.nil?
    config = {
      "incoming_request" => "/api",
      "bundle_api" => "/bundle_api",
      "wait_time" => 10,
      "thread_wait_after_closing_entrance" => 2,
      "max_waiting_thread" => 16
    }

    options.each do |key,value|
      if !value.nil?
        config[key] = value
      end
    end
    $configuration =  config
  end
end

#get_configurationObject



14
15
16
# File 'lib/bundle_requests/consumer.rb', line 14

def get_configuration
  $configuration
end

#get_waiting_threadsObject



10
11
12
# File 'lib/bundle_requests/consumer.rb', line 10

def get_waiting_threads
  $waiting_threads
end

#pop_some_waiting_threadsObject



40
41
42
43
44
45
46
47
48
# File 'lib/bundle_requests/consumer.rb', line 40

def pop_some_waiting_threads
  threads = []
  $waiting_threads.length.times do
    t = $waiting_threads.pop
    threads << t
  end
  Rails.logger.info("Currently proccessing #{threads.length} threads")
  threads
end