Class: Raindrops::Aggregate::PMQ

Inherits:
Object
  • Object
show all
Defined in:
lib/raindrops/aggregate/pmq.rb

Overview

Aggregate + POSIX message queues support for Ruby 1.9+ and Linux

This class is duck-type compatible with Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv Rack application, but can be used independently on compatible Runtimes.

Unlike the core of raindrops, this is only supported on Ruby 1.9+ and Linux 2.6+. Using this class requires the following additional RubyGems or libraries:

  • aggregate (tested with 0.2.2)

  • posix_mq (tested with 1.0.0)

Design

There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: “/raindrops”) that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from

Setting :worker_interval and :master_interval to 1 will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.

Constant Summary collapse

RDLOCK =

:stopdoc: These constants are for Linux. This is designed for aggregating TCP_INFO.

[ Fcntl::F_RDLCK ].pack("s @256".freeze).freeze
WRLOCK =
[ Fcntl::F_WRLCK ].pack("s @256".freeze).freeze
UNLOCK =
[ Fcntl::F_UNLCK ].pack("s @256".freeze).freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ PMQ

Creates a new Raindrops::Aggregate::PMQ object

Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate

options is a hash that accepts the following keys:

  • :queue - name of the POSIX message queue (default: “/raindrops”)

  • :worker_interval - interval to send to the master (default: 10)

  • :master_interval - interval to for the master to write out (default: 5)

  • :lossy - workers drop packets if master cannot keep up (default: false)

  • :aggregate - Aggregate object (default: Aggregate.new)

  • :mq_umask - umask for creatingthe POSIX message queue (default: 0666)



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/raindrops/aggregate/pmq.rb', line 64

def initialize(params = {})
  opts = {
    :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
    :worker_interval => 10,
    :master_interval => 5,
    :lossy => false,
    :mq_attr => nil,
    :mq_umask => 0666,
    :aggregate => Aggregate.new,
  }.merge! params
  @master_interval = opts[:master_interval]
  @worker_interval = opts[:worker_interval]
  @aggregate = opts[:aggregate]
  @worker_queue = @worker_interval ? [] : nil
  @mutex = Mutex.new

  @mq_name = opts[:queue]
  mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
  Tempfile.open("raindrops_pmq") do |t|
    @wr = File.open(t.path, "wb")
    @rd = File.open(t.path, "rb")
  end
  @wr.sync = true
  @cached_aggregate = @aggregate
  flush_master
  @mq_send = if opts[:lossy]
    @nr_dropped = 0
    mq.nonblock = true
    mq.method :trysend
  else
    mq.method :send
  end
end

Instance Attribute Details

#nr_droppedObject (readonly)

returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy



48
49
50
# File 'lib/raindrops/aggregate/pmq.rb', line 48

def nr_dropped
  @nr_dropped
end

Instance Method Details

#<<(val) ⇒ Object

adds a sample to the underlying Aggregate object



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/raindrops/aggregate/pmq.rb', line 99

def << val
  if q = @worker_queue
    q << val
    if q.size >= @worker_interval
      mq_send(q) or @nr_dropped += 1
      q.clear
    end
  else
    mq_send(val) or @nr_dropped += 1
  end
end

#aggregateObject

Loads the last shared Aggregate from the master thread/process



150
151
152
153
154
155
156
157
158
159
160
# File 'lib/raindrops/aggregate/pmq.rb', line 150

def aggregate
  @cached_aggregate ||= begin
    flush
    Marshal.load(synchronize(@rd, RDLOCK) do |rd|
      dst = StringIO.new
      dst.binmode
      IO.copy_stream(rd, dst, rd.size, 0)
      dst.string
    end)
  end
end

#countObject

proxy for Aggregate#count



214
# File 'lib/raindrops/aggregate/pmq.rb', line 214

def count; aggregate.count; end

#eachObject

proxy for Aggregate#each



241
# File 'lib/raindrops/aggregate/pmq.rb', line 241

def each; aggregate.each { |*args| yield(*args) }; end

#each_nonzeroObject

proxy for Aggregate#each_nonzero



244
# File 'lib/raindrops/aggregate/pmq.rb', line 244

def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end

#flushObject

flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as :worker_interval defines how frequently your queue will be flushed



205
206
207
208
209
210
211
# File 'lib/raindrops/aggregate/pmq.rb', line 205

def flush
  if q = @local_queue && ! q.empty?
    mq_send q
    q.clear
  end
  nil
end

#flush_masterObject

Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as :worker_interval defines how frequently your data will be flushed for workers to read.



165
166
167
168
169
170
171
172
# File 'lib/raindrops/aggregate/pmq.rb', line 165

def flush_master
  dump = Marshal.dump @aggregate
  synchronize(@wr, WRLOCK) do |wr|
    wr.truncate 0
    wr.rewind
    wr.write(dump)
  end
end

#lock!(io, type) ⇒ Object

:nodoc:



182
183
184
185
186
# File 'lib/raindrops/aggregate/pmq.rb', line 182

def lock! io, type # :nodoc:
  io.fcntl Fcntl::F_SETLKW, type
rescue Errno::EINTR
  retry
end

#master_loopObject

Starts running a master loop, usually in a dedicated thread or process:

Thread.new { agg.master_loop }

Any worker can call agg.stop_master_loop to stop the master loop (possibly causing the thread or process to exit)



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/raindrops/aggregate/pmq.rb', line 123

def master_loop
  buf = ""
  a = @aggregate
  nr = 0
  mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
  begin
    if (nr -= 1) < 0
      nr = @master_interval
      flush_master
    end
    mq.shift(buf)
    data = begin
      Marshal.load(buf) or return
    rescue ArgumentError, TypeError
      next
    end
    Array === data ? data.each { |x| a << x } : a << data
  rescue Errno::EINTR
  rescue => e
    warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
    break
  end while true
ensure
  flush_master
end

#maxObject

proxy for Aggregate#max



217
# File 'lib/raindrops/aggregate/pmq.rb', line 217

def max; aggregate.max; end

#meanObject

proxy for Aggregate#mean



226
# File 'lib/raindrops/aggregate/pmq.rb', line 226

def mean; aggregate.mean; end

#minObject

proxy for Aggregate#min



220
# File 'lib/raindrops/aggregate/pmq.rb', line 220

def min; aggregate.min; end

#mq_send(val) ⇒ Object

:nodoc:



111
112
113
114
# File 'lib/raindrops/aggregate/pmq.rb', line 111

def mq_send(val) # :nodoc:
  @cached_aggregate = nil
  @mq_send.call Marshal.dump(val)
end

#outliers_highObject

proxy for Aggregate#outliers_high



235
# File 'lib/raindrops/aggregate/pmq.rb', line 235

def outliers_high; aggregate.outliers_high; end

#outliers_lowObject

proxy for Aggregate#outliers_low



232
# File 'lib/raindrops/aggregate/pmq.rb', line 232

def outliers_low; aggregate.outliers_low; end

#std_devObject

proxy for Aggregate#std_dev



229
# File 'lib/raindrops/aggregate/pmq.rb', line 229

def std_dev; aggregate.std_dev; end

#stop_master_loopObject

stops the currently running master loop, may be called from any worker thread or process



176
177
178
179
180
# File 'lib/raindrops/aggregate/pmq.rb', line 176

def stop_master_loop
  sleep 0.1 until mq_send(false)
rescue Errno::EINTR
  retry
end

#sumObject

proxy for Aggregate#sum



223
# File 'lib/raindrops/aggregate/pmq.rb', line 223

def sum; aggregate.sum; end

#synchronize(io, type) ⇒ Object

we use both a mutex for thread-safety and fcntl lock for process-safety



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/raindrops/aggregate/pmq.rb', line 189

def synchronize io, type # :nodoc:
  @mutex.synchronize do
    begin
      type = type.dup
      lock! io, type
      yield io
    ensure
      lock! io, type.replace(UNLOCK)
      type.clear
    end
  end
end

#to_s(*args) ⇒ Object

proxy for Aggregate#to_s



238
# File 'lib/raindrops/aggregate/pmq.rb', line 238

def to_s(*args); aggregate.to_s(*args); end