Class: Raindrops::Aggregate::PMQ
- Inherits:
-
Object
- Object
- Raindrops::Aggregate::PMQ
- Defined in:
- lib/raindrops/aggregate/pmq.rb
Overview
Aggregate + POSIX message queues support for Ruby 1.9+ and Linux
This class is duck-type compatible with Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv Rack application, but can be used independently on compatible Runtimes.
Unlike the core of raindrops, this is only supported on Ruby 1.9+ and Linux 2.6+. Using this class requires the following additional RubyGems or libraries:
-
aggregate (tested with 0.2.2)
-
posix_mq (tested with 1.0.0)
Design
There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: “/raindrops”) that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from
Setting :worker_interval
and :master_interval
to 1
will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.
Constant Summary collapse
- RDLOCK =
:stopdoc: These constants are for Linux. This is designed for aggregating TCP_INFO.
[ Fcntl::F_RDLCK ].pack("s @256".freeze).freeze
- WRLOCK =
[ Fcntl::F_WRLCK ].pack("s @256".freeze).freeze
- UNLOCK =
[ Fcntl::F_UNLCK ].pack("s @256".freeze).freeze
Instance Attribute Summary collapse
-
#nr_dropped ⇒ Object
readonly
returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy.
Instance Method Summary collapse
-
#<<(val) ⇒ Object
adds a sample to the underlying Aggregate object.
-
#aggregate ⇒ Object
Loads the last shared Aggregate from the master thread/process.
-
#count ⇒ Object
proxy for Aggregate#count.
-
#each ⇒ Object
proxy for Aggregate#each.
-
#each_nonzero ⇒ Object
proxy for Aggregate#each_nonzero.
-
#flush ⇒ Object
flushes the local queue of the worker process, sending all pending data to the master.
-
#flush_master ⇒ Object
Flushes the currently aggregate statistics to a temporary file.
-
#initialize(params = {}) ⇒ PMQ
constructor
Creates a new Raindrops::Aggregate::PMQ object.
-
#lock!(io, type) ⇒ Object
:nodoc:.
-
#master_loop ⇒ Object
Starts running a master loop, usually in a dedicated thread or process:.
-
#max ⇒ Object
proxy for Aggregate#max.
-
#mean ⇒ Object
proxy for Aggregate#mean.
-
#min ⇒ Object
proxy for Aggregate#min.
-
#mq_send(val) ⇒ Object
:nodoc:.
-
#outliers_high ⇒ Object
proxy for Aggregate#outliers_high.
-
#outliers_low ⇒ Object
proxy for Aggregate#outliers_low.
-
#std_dev ⇒ Object
proxy for Aggregate#std_dev.
-
#stop_master_loop ⇒ Object
stops the currently running master loop, may be called from any worker thread or process.
-
#sum ⇒ Object
proxy for Aggregate#sum.
-
#synchronize(io, type) ⇒ Object
we use both a mutex for thread-safety and fcntl lock for process-safety.
-
#to_s(*args) ⇒ Object
proxy for Aggregate#to_s.
Constructor Details
#initialize(params = {}) ⇒ PMQ
Creates a new Raindrops::Aggregate::PMQ object
Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate
options
is a hash that accepts the following keys:
-
:queue - name of the POSIX message queue (default: “/raindrops”)
-
:worker_interval - interval to send to the master (default: 10)
-
:master_interval - interval to for the master to write out (default: 5)
-
:lossy - workers drop packets if master cannot keep up (default: false)
-
:aggregate - Aggregate object (default: Aggregate.new)
-
:mq_umask - umask for creatingthe POSIX message queue (default: 0666)
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/raindrops/aggregate/pmq.rb', line 64 def initialize(params = {}) opts = { :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, :mq_attr => nil, :mq_umask => 0666, :aggregate => Aggregate.new, }.merge! params @master_interval = opts[:master_interval] @worker_interval = opts[:worker_interval] @aggregate = opts[:aggregate] @worker_queue = @worker_interval ? [] : nil @mutex = Mutex.new @mq_name = opts[:queue] mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr] Tempfile.open("raindrops_pmq") do |t| @wr = File.open(t.path, "wb") @rd = File.open(t.path, "rb") end @wr.sync = true @cached_aggregate = @aggregate flush_master @mq_send = if opts[:lossy] @nr_dropped = 0 mq.nonblock = true mq.method :trysend else mq.method :send end end |
Instance Attribute Details
#nr_dropped ⇒ Object (readonly)
returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy
48 49 50 |
# File 'lib/raindrops/aggregate/pmq.rb', line 48 def nr_dropped @nr_dropped end |
Instance Method Details
#<<(val) ⇒ Object
adds a sample to the underlying Aggregate object
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/raindrops/aggregate/pmq.rb', line 99 def << val if q = @worker_queue q << val if q.size >= @worker_interval mq_send(q) or @nr_dropped += 1 q.clear end else mq_send(val) or @nr_dropped += 1 end end |
#aggregate ⇒ Object
Loads the last shared Aggregate from the master thread/process
150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/raindrops/aggregate/pmq.rb', line 150 def aggregate @cached_aggregate ||= begin flush Marshal.load(synchronize(@rd, RDLOCK) do |rd| dst = StringIO.new dst.binmode IO.copy_stream(rd, dst, rd.size, 0) dst.string end) end end |
#count ⇒ Object
proxy for Aggregate#count
214 |
# File 'lib/raindrops/aggregate/pmq.rb', line 214 def count; aggregate.count; end |
#each ⇒ Object
proxy for Aggregate#each
241 |
# File 'lib/raindrops/aggregate/pmq.rb', line 241 def each; aggregate.each { |*args| yield(*args) }; end |
#each_nonzero ⇒ Object
proxy for Aggregate#each_nonzero
244 |
# File 'lib/raindrops/aggregate/pmq.rb', line 244 def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end |
#flush ⇒ Object
flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as :worker_interval
defines how frequently your queue will be flushed
205 206 207 208 209 210 211 |
# File 'lib/raindrops/aggregate/pmq.rb', line 205 def flush if q = @local_queue && ! q.empty? mq_send q q.clear end nil end |
#flush_master ⇒ Object
Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as :worker_interval
defines how frequently your data will be flushed for workers to read.
165 166 167 168 169 170 171 172 |
# File 'lib/raindrops/aggregate/pmq.rb', line 165 def flush_master dump = Marshal.dump @aggregate synchronize(@wr, WRLOCK) do |wr| wr.truncate 0 wr.rewind wr.write(dump) end end |
#lock!(io, type) ⇒ Object
:nodoc:
182 183 184 185 186 |
# File 'lib/raindrops/aggregate/pmq.rb', line 182 def lock! io, type # :nodoc: io.fcntl Fcntl::F_SETLKW, type rescue Errno::EINTR retry end |
#master_loop ⇒ Object
Starts running a master loop, usually in a dedicated thread or process:
Thread.new { agg.master_loop }
Any worker can call agg.stop_master_loop
to stop the master loop (possibly causing the thread or process to exit)
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/raindrops/aggregate/pmq.rb', line 123 def master_loop buf = "" a = @aggregate nr = 0 mq = POSIX_MQ.new @mq_name, :r # this one is always blocking begin if (nr -= 1) < 0 nr = @master_interval flush_master end mq.shift(buf) data = begin Marshal.load(buf) or return rescue ArgumentError, TypeError next end Array === data ? data.each { |x| a << x } : a << data rescue Errno::EINTR rescue => e warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" break end while true ensure flush_master end |
#max ⇒ Object
proxy for Aggregate#max
217 |
# File 'lib/raindrops/aggregate/pmq.rb', line 217 def max; aggregate.max; end |
#mean ⇒ Object
proxy for Aggregate#mean
226 |
# File 'lib/raindrops/aggregate/pmq.rb', line 226 def mean; aggregate.mean; end |
#min ⇒ Object
proxy for Aggregate#min
220 |
# File 'lib/raindrops/aggregate/pmq.rb', line 220 def min; aggregate.min; end |
#mq_send(val) ⇒ Object
:nodoc:
111 112 113 114 |
# File 'lib/raindrops/aggregate/pmq.rb', line 111 def mq_send(val) # :nodoc: @cached_aggregate = nil @mq_send.call Marshal.dump(val) end |
#outliers_high ⇒ Object
proxy for Aggregate#outliers_high
235 |
# File 'lib/raindrops/aggregate/pmq.rb', line 235 def outliers_high; aggregate.outliers_high; end |
#outliers_low ⇒ Object
proxy for Aggregate#outliers_low
232 |
# File 'lib/raindrops/aggregate/pmq.rb', line 232 def outliers_low; aggregate.outliers_low; end |
#std_dev ⇒ Object
proxy for Aggregate#std_dev
229 |
# File 'lib/raindrops/aggregate/pmq.rb', line 229 def std_dev; aggregate.std_dev; end |
#stop_master_loop ⇒ Object
stops the currently running master loop, may be called from any worker thread or process
176 177 178 179 180 |
# File 'lib/raindrops/aggregate/pmq.rb', line 176 def stop_master_loop sleep 0.1 until mq_send(false) rescue Errno::EINTR retry end |
#sum ⇒ Object
proxy for Aggregate#sum
223 |
# File 'lib/raindrops/aggregate/pmq.rb', line 223 def sum; aggregate.sum; end |
#synchronize(io, type) ⇒ Object
we use both a mutex for thread-safety and fcntl lock for process-safety
189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/raindrops/aggregate/pmq.rb', line 189 def synchronize io, type # :nodoc: @mutex.synchronize do begin type = type.dup lock! io, type yield io ensure lock! io, type.replace(UNLOCK) type.clear end end end |
#to_s(*args) ⇒ Object
proxy for Aggregate#to_s
238 |
# File 'lib/raindrops/aggregate/pmq.rb', line 238 def to_s(*args); aggregate.to_s(*args); end |