Class: EasyQ::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/easy_q/easy_q_service.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Service

Returns a new instance of Service.



12
13
14
15
16
17
18
19
20
21
22
# File 'lib/easy_q/easy_q_service.rb', line 12

def initialize(options={})
    @shut_down = false            
    @settings = options
    @mutex = Mutex.new
    @logger = Logger.new $stderr
    @cache = []
    ActiveRecord::Base.logger = @logger
    ActiveRecord::Base.colorize_logging = false
    ActiveRecord::Base.establish_connection(options[:database])

end

Instance Attribute Details

#shut_downObject

Returns the value of attribute shut_down.



10
11
12
# File 'lib/easy_q/easy_q_service.rb', line 10

def shut_down
  @shut_down
end

Instance Method Details

#empty(queue_name) ⇒ Object



58
59
60
61
# File 'lib/easy_q/easy_q_service.rb', line 58

def empty(queue_name)
  Message.delete_all "queue = '#{queue_name}'"
  return nil
end

#flush(queue_name = nil) ⇒ Object



63
64
65
66
67
68
69
70
71
72
# File 'lib/easy_q/easy_q_service.rb', line 63

def flush(queue_name = nil)
	
	if queue_name.nil?
		Message.delete_all ["(created_at + ttl) < ?)", queue_name, Time.now]
	else
		Message.delete_all ["(queue = ? and (created_at + ttl) < ?)", queue_name, Time.now]
	end
	
	return nil
end

#peek(queue_name, number = 10, direction = 'top') ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/easy_q/easy_q_service.rb', line 44

def peek(queue_name, number = 10, direction = 'top')
    return_value = []
    case direction
      when 'top'
        results =  Message.find(:all, :limit => number, :conditions => ["queue = ?",queue_name])
      when 'bottom'
        results = Message.find(:all, :limit => number, :conditions => ["queue = ?",queue_name], :order => :created_at)
    end
    results.each do |r|
      return_value << r.to_hash
    end
    return return_value
end

#pop(queue_name) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/easy_q/easy_q_service.rb', line 91

def pop(queue_name)
    @mutex.synchronize do
      message = Message.find(:first,
          :conditions => ["(queue = ? and (created_at + ttl) > ?) or (queue = ? and ttl is null)", 
                                  queue_name, Time.now, queue_name]
      )
      
      if  !message.nil?
        message.destroy 
        return message.to_hash
      else
        return nil
      end
    end
end

#push(options = {}) ⇒ Object

Raises:

  • (Exception)


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/easy_q/easy_q_service.rb', line 74

def push(options={})
  raise Exception, "missing options[:queue]" if  !options.key?(:queue)
  raise Exception, "missing options[:body]" if  !options.key?(:body)
  
  if options.key?(:queue)
    raise Exception, "options[:queue] cannot be empty" if options[:queue].empty?
  end

  if options.key?(:body)
    raise Exception, "options[:body] cannot be empty" if options[:body].empty?
    raise Exception, "options[:body] can only be a String" if options[:body].class.to_s != "String"
  end
  
  Message.create(options)
  return nil
end

#startObject



24
25
26
27
28
29
30
31
32
# File 'lib/easy_q/easy_q_service.rb', line 24

def start
    @mutex.synchronize do
        #to do - add code to check if service is already running
        uri = "druby://#{@settings[:service][:address]}:#{@settings[:service][:port]}"
        DRb.install_acl(ACL.new(@settings[:service][:acl],ACL::DENY_ALLOW))
        @drb_service = DRb.start_service(uri, self)
        puts "service started and listening at #{uri}"
    end
end

#statsObject



40
41
42
# File 'lib/easy_q/easy_q_service.rb', line 40

def stats
  return Message.count(:group=>:queue)
end

#stopObject



34
35
36
37
38
# File 'lib/easy_q/easy_q_service.rb', line 34

def stop
  @mutex.synchronize do
    @shut_down = true
  end
end