Class: EasyQ::Service
- Inherits:
-
Object
- Object
- EasyQ::Service
- Defined in:
- lib/easy_q/easy_q_service.rb
Instance Attribute Summary collapse
-
#shut_down ⇒ Object
Returns the value of attribute shut_down.
Instance Method Summary collapse
- #empty(queue_name) ⇒ Object
- #flush(queue_name = nil) ⇒ Object
-
#initialize(options = {}) ⇒ Service
constructor
A new instance of Service.
- #peek(queue_name, number = 10, direction = 'top') ⇒ Object
- #pop(queue_name) ⇒ Object
- #push(options = {}) ⇒ Object
- #start ⇒ Object
- #stats ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Service
Returns a new instance of Service.
12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/easy_q/easy_q_service.rb', line 12 def initialize(={}) @shut_down = false @settings = @mutex = Mutex.new @logger = Logger.new $stderr @cache = [] ActiveRecord::Base.logger = @logger ActiveRecord::Base.colorize_logging = false ActiveRecord::Base.establish_connection([:database]) end |
Instance Attribute Details
#shut_down ⇒ Object
Returns the value of attribute shut_down.
10 11 12 |
# File 'lib/easy_q/easy_q_service.rb', line 10 def shut_down @shut_down end |
Instance Method Details
#empty(queue_name) ⇒ Object
58 59 60 61 |
# File 'lib/easy_q/easy_q_service.rb', line 58 def empty(queue_name) Message.delete_all "queue = '#{queue_name}'" return nil end |
#flush(queue_name = nil) ⇒ Object
63 64 65 66 67 68 69 70 71 72 |
# File 'lib/easy_q/easy_q_service.rb', line 63 def flush(queue_name = nil) if queue_name.nil? Message.delete_all ["(created_at + ttl) < ?)", queue_name, Time.now] else Message.delete_all ["(queue = ? and (created_at + ttl) < ?)", queue_name, Time.now] end return nil end |
#peek(queue_name, number = 10, direction = 'top') ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/easy_q/easy_q_service.rb', line 44 def peek(queue_name, number = 10, direction = 'top') return_value = [] case direction when 'top' results = Message.find(:all, :limit => number, :conditions => ["queue = ?",queue_name]) when 'bottom' results = Message.find(:all, :limit => number, :conditions => ["queue = ?",queue_name], :order => :created_at) end results.each do |r| return_value << r.to_hash end return return_value end |
#pop(queue_name) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/easy_q/easy_q_service.rb', line 91 def pop(queue_name) @mutex.synchronize do = Message.find(:first, :conditions => ["(queue = ? and (created_at + ttl) > ?) or (queue = ? and ttl is null)", queue_name, Time.now, queue_name] ) if !.nil? .destroy return .to_hash else return nil end end end |
#push(options = {}) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/easy_q/easy_q_service.rb', line 74 def push(={}) raise Exception, "missing options[:queue]" if !.key?(:queue) raise Exception, "missing options[:body]" if !.key?(:body) if .key?(:queue) raise Exception, "options[:queue] cannot be empty" if [:queue].empty? end if .key?(:body) raise Exception, "options[:body] cannot be empty" if [:body].empty? raise Exception, "options[:body] can only be a String" if [:body].class.to_s != "String" end Message.create() return nil end |
#start ⇒ Object
24 25 26 27 28 29 30 31 32 |
# File 'lib/easy_q/easy_q_service.rb', line 24 def start @mutex.synchronize do #to do - add code to check if service is already running uri = "druby://#{@settings[:service][:address]}:#{@settings[:service][:port]}" DRb.install_acl(ACL.new(@settings[:service][:acl],ACL::DENY_ALLOW)) @drb_service = DRb.start_service(uri, self) puts "service started and listening at #{uri}" end end |
#stats ⇒ Object
40 41 42 |
# File 'lib/easy_q/easy_q_service.rb', line 40 def stats return Message.count(:group=>:queue) end |
#stop ⇒ Object
34 35 36 37 38 |
# File 'lib/easy_q/easy_q_service.rb', line 34 def stop @mutex.synchronize do @shut_down = true end end |