Class: MQueue::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/m_queue/queue.rb

Defined Under Namespace

Classes: MQueueError, NoServersLeft

Constant Summary collapse

LOG_DIR =
File.join(MQUEUE_ROOT, 'log')
@@queues =
[]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.inherited(subclass) ⇒ Object



10
11
12
# File 'lib/m_queue/queue.rb', line 10

def inherited(subclass)
  @@queues << subclass
end

.pollObject



47
48
49
50
51
52
53
54
55
# File 'lib/m_queue/queue.rb', line 47

def poll
  send_to_server(true) {|server|
    begin
      server[queue_name]
    rescue => e
      puts e
    end
  }
end

.publish(msg) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/m_queue/queue.rb', line 35

def publish(msg)
  if defined?(RAILS_ENV) && 
    (RAILS_ENV == 'development' or
      RAILS_ENV == 'test')
        return self.new.on_message(msg)
  end
  send_to_server {|server|
    server[queue_name] = msg.to_yaml
  }
  msg
end

.queue_nameObject



57
58
59
# File 'lib/m_queue/queue.rb', line 57

def queue_name
  self.name
end

.queuesObject



14
15
16
# File 'lib/m_queue/queue.rb', line 14

def queues
  @@queues
end

.reload!Object



69
70
71
72
73
# File 'lib/m_queue/queue.rb', line 69

def reload!
  servers.each do |s|
    s.reload! if s.respond_to?('reload!')
  end
end

.runObject



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/m_queue/queue.rb', line 18

def run
  num_sleeps = 0
  loop do
    msg = poll
    self.new.process(msg) if msg
    if !msg
      if num_sleeps < 50
        num_sleeps += 1
        num_sleeps *= 2
      end
      sleep 2 #num_sleeps
    else
      num_sleeps = 0
    end
  end
end

.serversObject



65
66
67
# File 'lib/m_queue/queue.rb', line 65

def servers
  @@servers ||= []
end

.servers=(srvs) ⇒ Object



61
62
63
# File 'lib/m_queue/queue.rb', line 61

def servers=(srvs)
  @@servers = srvs
end

Instance Method Details

#loggerObject



113
114
115
116
117
118
# File 'lib/m_queue/queue.rb', line 113

def logger
 return @logger if @logger
 FileUtils.mkdir_p(LOG_DIR)
 @logger = Logger.new(File.join(LOG_DIR, self.class.queue_name + '.log'))     
 @logger
end

#on_message(msg) ⇒ Object



120
121
122
# File 'lib/m_queue/queue.rb', line 120

def on_message(msg)
  raise 'You must implement on_message.'
end

#process(msg) ⇒ Object

self



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/m_queue/queue.rb', line 101

def process(msg)
  reload!
  begin
    on_message(YAML.load(msg))
  rescue => e
    #logger.error  "\n#{ e.message } - (#{ e.class })\n" <<  
    #              "#{(e.backtrace or []).join("\n")}"
    puts "\n#{ e.message } - (#{ e.class })\n" << 
                  "#{(e.backtrace or []).join("\n")}"
  end
end

#reload!Object



124
125
126
# File 'lib/m_queue/queue.rb', line 124

def reload!
  ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
end