Class: Apphunkd::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/apphunkd/queue.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeQueue

Returns a new instance of Queue.



8
9
10
11
# File 'lib/apphunkd/queue.rb', line 8

def initialize
  @items = []
  @mutex = Mutex.new
end

Instance Attribute Details

#itemsObject

Returns the value of attribute items.



4
5
6
# File 'lib/apphunkd/queue.rb', line 4

def items
  @items
end

#mutexObject

Returns the value of attribute mutex.



5
6
7
# File 'lib/apphunkd/queue.rb', line 5

def mutex
  @mutex
end

#workerObject

Returns the value of attribute worker.



6
7
8
# File 'lib/apphunkd/queue.rb', line 6

def worker
  @worker
end

Instance Method Details

#activate!Object



13
14
15
# File 'lib/apphunkd/queue.rb', line 13

def activate!
  @worker = initialize_worker
end

#initialize_workerObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/apphunkd/queue.rb', line 31

def initialize_worker
  Thread.new do
    begin
      loop do
        unless @items.empty?
          items = get_items_from_stack
          items.each do |item|
            result = push_item_to_remote_service(item)
            if result.status == :ok
              case result.response.code
              when '201'
                debug "Message successfully stored."
              when '400', '403'
                log_error "Remote Service refused to store item: #{result.response.code} / #{result.response.body}. Dropped."
              else
                put_item_to_stack(item)
                log_error "Remote Service went crazy. Put item back in queue: #{result.response.code} / #{result.response.body}"
              end
            else
              put_item_to_stack(item)
              log_error "Could not push to Remote Service: Connection Error. Put item back in queue."
            end
          end
        end
        sleep
      end
    rescue => e
      log_error "Error: #{e}"
    end
  end
end

#store(message = {}) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/apphunkd/queue.rb', line 17

def store(message = {})
  debug "Receive: #{message.inspect}"
  return false if message[:message].blank? || message[:token].blank?
  message[:stored_at] = Time.now

  @mutex.synchronize do
    @items = @items[-9999..-1] if @items.size >= 9999
    @items << message
  end

  @worker.wakeup
  return true
end