Class: Scales::Worker::Worker
- Inherits:
-
Object
- Object
- Scales::Worker::Worker
- Defined in:
- lib/scales-worker/worker.rb
Instance Attribute Summary collapse
-
#app ⇒ Object
readonly
Returns the value of attribute app.
-
#pool ⇒ Object
readonly
Returns the value of attribute pool.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Instance Method Summary collapse
-
#initialize(type = Application::Rails) ⇒ Worker
constructor
A new instance of Worker.
- #parse(job) ⇒ Object
- #post_process!(job) ⇒ Object
- #process!(job) ⇒ Object
-
#process_request! ⇒ Object
Wait for a request, process it, publish the response and exit.
- #start_pool!(size = Scales.config.worker_threads) ⇒ Object
-
#work! ⇒ Object
Loop the processing of requests.
Constructor Details
#initialize(type = Application::Rails) ⇒ Worker
Returns a new instance of Worker.
9 10 11 12 |
# File 'lib/scales-worker/worker.rb', line 9 def initialize(type = Application::Rails) @type, @app, @status, @pool = type, type.app, Status.new("localhost"), [] at_exit{ @status.stop! } end |
Instance Attribute Details
#app ⇒ Object (readonly)
Returns the value of attribute app.
4 5 6 |
# File 'lib/scales-worker/worker.rb', line 4 def app @app end |
#pool ⇒ Object (readonly)
Returns the value of attribute pool.
6 7 8 |
# File 'lib/scales-worker/worker.rb', line 6 def pool @pool end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
7 8 9 |
# File 'lib/scales-worker/worker.rb', line 7 def status @status end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
5 6 7 |
# File 'lib/scales-worker/worker.rb', line 5 def type @type end |
Instance Method Details
#parse(job) ⇒ Object
14 15 16 |
# File 'lib/scales-worker/worker.rb', line 14 def parse(job) Job.to_env(job) end |
#post_process!(job) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/scales-worker/worker.rb', line 33 def post_process!(job) env = parse(job) while path = Thread.current[:post_process_queue].pop request = Path.to_env(path, env) begin response = @app.call(request) response.last.close if response.last.respond_to?(:close) rescue Exception => e puts e end end end |
#process!(job) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/scales-worker/worker.rb', line 18 def process!(job) env = parse(job) id = env['scales.id'] @status.took_request_from_queue!(env) begin response = @app.call(env) response.last.close if response.last.respond_to?(:close) [id, Response.to_job(id, response)] rescue Exception => e [id, [500, {}, e.to_s]] end end |
#process_request! ⇒ Object
Wait for a request, process it, publish the response and exit
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/scales-worker/worker.rb', line 48 def process_request! job = Thread.current[:redis_blocking].brpop(Scales::Storage::REQUEST_QUEUE, 0).last id, response = nil, nil Thread.current[:post_process_queue] = [] id, response = process!(job) post_process!(job) @status.put_response_in_queue!(response) Thread.current[:redis_nonblocking].publish(Scales::Storage::RESPONSE_CHANNEL, JSON.generate(response)) [id, response] end |
#start_pool!(size = Scales.config.worker_threads) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/scales-worker/worker.rb', line 61 def start_pool!(size = Scales.config.worker_threads) Thread.abort_on_exception = true size.times do @pool << Thread.new do Thread.current[:redis_blocking] = Scales::Storage::Sync.new_connection! Thread.current[:redis_nonblocking] = Scales::Storage::Sync.new_connection! loop do begin process_request! rescue Exception => e @status.logger.error(e.to_s) raise e if Scales.env == "test" end end end end sleep end |
#work! ⇒ Object
Loop the processing of requests
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/scales-worker/worker.rb', line 81 def work! @status.start! puts "Environment: #{Scales.env}".green puts "Application: #{@type.name}".green puts "Path: #{Dir.pwd}".green puts "Log Path: #{@status.log_path}".green puts "Threads: #{Scales.config.worker_threads}".green puts "Redis: #{Scales.config.host}:#{Scales.config.port}/#{Scales.config.database}".green begin start_pool! rescue Interrupt => e @pool.map(&:exit) puts "Goodbye".green end end |