Class: Scales::Worker::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/scales-worker/worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type = Application::Rails) ⇒ Worker

Returns a new instance of Worker.



9
10
11
12
# File 'lib/scales-worker/worker.rb', line 9

def initialize(type = Application::Rails)
  @type, @app, @status, @pool = type, type.app, Status.new("localhost"), []
  at_exit{ @status.stop! }
end

Instance Attribute Details

#appObject (readonly)

Returns the value of attribute app.



4
5
6
# File 'lib/scales-worker/worker.rb', line 4

def app
  @app
end

#poolObject (readonly)

Returns the value of attribute pool.



6
7
8
# File 'lib/scales-worker/worker.rb', line 6

def pool
  @pool
end

#statusObject (readonly)

Returns the value of attribute status.



7
8
9
# File 'lib/scales-worker/worker.rb', line 7

def status
  @status
end

#typeObject (readonly)

Returns the value of attribute type.



5
6
7
# File 'lib/scales-worker/worker.rb', line 5

def type
  @type
end

Instance Method Details

#parse(job) ⇒ Object



14
15
16
# File 'lib/scales-worker/worker.rb', line 14

def parse(job)
  Job.to_env(job)
end

#post_process!(job) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/scales-worker/worker.rb', line 33

def post_process!(job)
  env = parse(job)
  while path = Thread.current[:post_process_queue].pop
    request = Path.to_env(path, env)
    
    begin
      response  = @app.call(request)
      response.last.close if response.last.respond_to?(:close)
    rescue Exception => e
      puts e
    end
  end
end

#process!(job) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/scales-worker/worker.rb', line 18

def process!(job)
  env = parse(job)
  id  = env['scales.id']
  
  @status.took_request_from_queue!(env)
  
  begin
    response  = @app.call(env)
    response.last.close if response.last.respond_to?(:close)
    [id, Response.to_job(id, response)]
  rescue Exception => e
    [id, [500, {}, e.to_s]]
  end
end

#process_request!Object

Wait for a request, process it, publish the response and exit



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/scales-worker/worker.rb', line 48

def process_request!
  job = Thread.current[:redis_blocking].brpop(Scales::Storage::REQUEST_QUEUE, 0).last
  id, response = nil, nil
  
  Thread.current[:post_process_queue] = []
  id, response = process!(job)
  post_process!(job)
  @status.put_response_in_queue!(response)
  Thread.current[:redis_nonblocking].publish(Scales::Storage::RESPONSE_CHANNEL, JSON.generate(response))
  
  [id, response]
end

#start_pool!(size = Scales.config.worker_threads) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/scales-worker/worker.rb', line 61

def start_pool!(size = Scales.config.worker_threads)
  Thread.abort_on_exception = true
  size.times do
    @pool << Thread.new do
      Thread.current[:redis_blocking]     = Scales::Storage::Sync.new_connection!
      Thread.current[:redis_nonblocking]  = Scales::Storage::Sync.new_connection!
      loop do
        begin
          process_request!
        rescue Exception => e
          @status.logger.error(e.to_s)
          raise e if Scales.env == "test"
        end
      end
    end
  end
  sleep
end

#work!Object

Loop the processing of requests



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/scales-worker/worker.rb', line 81

def work!
  @status.start!
  
  puts "Environment:    #{Scales.env}".green
  puts "Application:    #{@type.name}".green
  puts "Path:           #{Dir.pwd}".green
  puts "Log Path:       #{@status.log_path}".green
  puts "Threads:        #{Scales.config.worker_threads}".green
  puts "Redis:          #{Scales.config.host}:#{Scales.config.port}/#{Scales.config.database}".green
  
  begin
    start_pool!
  rescue Interrupt => e
    @pool.map(&:exit)
    puts "Goodbye".green
  end
end