Class: Sad::Server
- Inherits:
-
Object
- Object
- Sad::Server
- Defined in:
- lib/sad/server.rb
Class Method Summary collapse
- .fetch(queue) ⇒ Object
- .fetch_with_interval(queue) ⇒ Object
- .payload_call(payload) ⇒ Object
- .register_signal ⇒ Object
- .run(queue) ⇒ Object
- .shutdown ⇒ Object
- .shutdown! ⇒ Object
- .shutdown? ⇒ Boolean
- .status ⇒ Object
Class Method Details
.fetch(queue) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/sad/server.rb', line 12 def fetch(queue) ::Sad::Procline.set("Waiting for #{queue}") request = ::Sad::Config.redis.blpop(queue, 30) request.callback{|_, data| ::Sad::Procline.set("Fetched #{queue} - #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}") if data ::Sad.logger.info '-'*15 + data.inspect + '-'*15 payload = Payload.decode(data) payload_call(payload) end fetch_with_interval(queue) } request.errback{ ::Sad.logger.error "error with redis request.\n#{request.inspect}" fetch_with_interval(queue) } end |
.fetch_with_interval(queue) ⇒ Object
33 34 35 36 37 |
# File 'lib/sad/server.rb', line 33 def fetch_with_interval(queue) EM.add_timer(::Sad::Config.interval){ fetch(queue) unless shutdown? } end |
.payload_call(payload) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/sad/server.rb', line 39 def payload_call(payload) # 如果该任务有延时执行要求, # 则在定时器执行时将其延时的key删掉, # 再重新入队 if payload.sad_args['delay'] and payload.sad_args['delay'] != '' and payload.sad_args['delay'] != 0 EM.add_timer(payload.sad_args['delay'].to_i){ payload.sad_args.delete('delay') payload.enqueue } else begin payload.perform rescue Exception => e ::Sad.logger.error("#{e.to_s}$/#{e.backtrace.join($/)}") end end end |
.register_signal ⇒ Object
57 58 59 60 61 |
# File 'lib/sad/server.rb', line 57 def register_signal trap('TERM') { shutdown! } trap('INT') { shutdown! } trap('QUIT') { shutdown } end |
.run(queue) ⇒ Object
4 5 6 7 8 9 10 |
# File 'lib/sad/server.rb', line 4 def run(queue) ::Sad.logger.info("#{'#'*5} Sad server start. #{'#'*5}") @_shutdown = false register_signal Sad.on_before_start.call if Sad.on_before_start fetch(Sad::Config.queue(queue)) end |
.shutdown ⇒ Object
72 73 74 75 |
# File 'lib/sad/server.rb', line 72 def shutdown @_shutdown = true EM.stop end |
.shutdown! ⇒ Object
63 64 65 66 |
# File 'lib/sad/server.rb', line 63 def shutdown! EM.stop exit(0) end |
.shutdown? ⇒ Boolean
68 69 70 |
# File 'lib/sad/server.rb', line 68 def shutdown? @_shutdown end |
.status ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/sad/server.rb', line 77 def status if EventMachine.instance_eval {defined? :threadqueue} and EventMachine.instance_eval {defined? :resultqueue} content = { 'threadqueue' => EventMachine.instance_eval {@threadqueue and @threadqueue.size}, 'resultqueue' => EventMachine.instance_eval {@resultqueue and @resultqueue.size}, 'num_waiting' => EventMachine.instance_eval {@threadqueue and @threadqueue.num_waiting} } return content else {} end end |