Class: Stackify::MsgsQueue
- Includes:
- MonitorMixin
- Defined in:
- lib/stackify/msgs_queue.rb
Constant Summary collapse
- CHUNK_MIN_WEIGHT =
TODO: restrict possibility to work with class if app is off
50- ERROR_SIZE =
10- LOG_SIZE =
1- DELAY_WAITING =
2
Instance Method Summary collapse
- #add_msg(msg) ⇒ Object (also: #<<, #push)
-
#initialize ⇒ MsgsQueue
constructor
A new instance of MsgsQueue.
- #pop_all ⇒ Object
- #push_remained_msgs ⇒ Object
Constructor Details
#initialize ⇒ MsgsQueue
Returns a new instance of MsgsQueue.
10 11 12 13 |
# File 'lib/stackify/msgs_queue.rb', line 10 def initialize super(Stackify.configuration.queue_max_size) reset_current_chunk end |
Instance Method Details
#add_msg(msg) ⇒ Object Also known as: <<, push
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/stackify/msgs_queue.rb', line 30 def add_msg msg self.synchronize do if msg.is_a?(Hash) @current_chunk_weight += msg['Ex'].nil? ? LOG_SIZE : ERROR_SIZE @current_chunk << msg if @current_chunk_weight >= CHUNK_MIN_WEIGHT push_current_chunk end else Stackify.log_internal_error "MsgsQueue: add_msg should get hash, but not a #{msg.class}" end end end |
#pop_all ⇒ Object
47 48 49 50 51 52 53 54 55 |
# File 'lib/stackify/msgs_queue.rb', line 47 def pop_all self.synchronize do msgs = [] until self.empty? do msgs << self.pop end msgs end end |
#push_remained_msgs ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/stackify/msgs_queue.rb', line 17 def push_remained_msgs wait_until_all_workers_will_add_msgs self.synchronize do push_current_chunk Stackify.shutdown_all if self.length > 0 Stackify.logs_sender.send_remained_msgs Stackify.internal_log :info, 'All remained logs are sent' Stackify.status = Stackify::STATUSES[:terminated] end end end |