Class: Stackify::MsgsQueue

Inherits:
SizedQueue
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/stackify/msgs_queue.rb

Constant Summary collapse

CHUNK_MIN_WEIGHT =

TODO: restrict possibility to work with class if app is off

50
ERROR_SIZE =
10
LOG_SIZE =
1
DELAY_WAITING =
2

Instance Method Summary collapse

Constructor Details

#initializeMsgsQueue

Returns a new instance of MsgsQueue.



10
11
12
13
# File 'lib/stackify/msgs_queue.rb', line 10

def initialize
  super(Stackify.configuration.queue_max_size)
  reset_current_chunk
end

Instance Method Details

#add_msg(msg) ⇒ Object Also known as: <<, push



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/stackify/msgs_queue.rb', line 30

def add_msg msg
  self.synchronize do
    if msg.is_a?(Hash)
      @current_chunk_weight += msg['Ex'].nil? ? LOG_SIZE : ERROR_SIZE
      @current_chunk << msg
      if @current_chunk_weight >= CHUNK_MIN_WEIGHT
        push_current_chunk
      end
    else
      Stackify.log_internal_error "MsgsQueue: add_msg should get hash, but not a #{msg.class}"
    end
  end
end

#pop_allObject



47
48
49
50
51
52
53
54
55
# File 'lib/stackify/msgs_queue.rb', line 47

def pop_all
  self.synchronize do
    msgs = []
    until self.empty? do
      msgs << self.pop
    end
    msgs
  end
end

#push_remained_msgsObject



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/stackify/msgs_queue.rb', line 17

def push_remained_msgs
  wait_until_all_workers_will_add_msgs
  self.synchronize do
    push_current_chunk
    Stackify.shutdown_all
    if self.length > 0
      Stackify.logs_sender.send_remained_msgs
      Stackify.internal_log :info, 'All remained logs are sent'
      Stackify.status = Stackify::STATUSES[:terminated]
    end
  end
end