class Raindrops::Aggregate::PMQ

Aggregate + POSIX message queues support for Ruby 1.9+ and Linux

This class is duck-type compatible with Aggregate and allows us to aggregate and share statistics from multiple processes/threads aided POSIX message queues. This is designed to be used with the Raindrops::LastDataRecv Rack application, but can be used independently on compatible Runtimes.

Unlike the core of raindrops, this is only supported on Ruby 1.9+ and Linux 2.6+. Using this class requires the following additional RubyGems or libraries:

Design

There is one master thread which aggregates statistics. Individual worker processes or threads will write to a shared POSIX message queue (default: “/raindrops”) that the master reads from. At a predefined interval, the master thread will write out to a shared, anonymous temporary file that workers may read from

Setting :worker_interval and :master_interval to 1 will result in perfect accuracy but at the cost of a high synchronization overhead. Larger intervals mean less frequent messaging for higher performance but lower accuracy.

Attributes

nr_dropped[R]

returns the number of dropped messages sent to a POSIX message queue if non-blocking operation was desired with :lossy

Public Class Methods

new(params = {}) click to toggle source

Creates a new Raindrops::Aggregate::PMQ object

Raindrops::Aggregate::PMQ.new(options = {})  -> aggregate

options is a hash that accepts the following keys:

  • :queue - name of the POSIX message queue (default: “/raindrops”)

  • :worker_interval - interval to send to the master (default: 10)

  • :master_interval - interval to for the master to write out (default: 5)

  • :lossy - workers drop packets if master cannot keep up (default: false)

  • :aggregate - Aggregate object (default: Aggregate.new)

  • :mq_umask - umask for creatingthe POSIX message queue (default: 0666)

# File lib/raindrops/aggregate/pmq.rb, line 63
def initialize(params = {})
  opts = {
    :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
    :worker_interval => 10,
    :master_interval => 5,
    :lossy => false,
    :mq_attr => nil,
    :mq_umask => 0666,
    :aggregate => Aggregate.new,
  }.merge! params
  @master_interval = opts[:master_interval]
  @worker_interval = opts[:worker_interval]
  @aggregate = opts[:aggregate]
  @worker_queue = @worker_interval ? [] : nil
  @mutex = Mutex.new

  @mq_name = opts[:queue]
  mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
  Tempfile.open("raindrops_pmq") do |t|
    @wr = File.open(t.path, "wb")
    @rd = File.open(t.path, "rb")
  end
  @wr.sync = true
  @cached_aggregate = @aggregate
  flush_master
  @mq_send = if opts[:lossy]
    @nr_dropped = 0
    mq.nonblock = true
    mq.method :trysend
  else
    mq.method :send
  end
end

Public Instance Methods

<<(val) click to toggle source

adds a sample to the underlying Aggregate object

# File lib/raindrops/aggregate/pmq.rb, line 98
def << val
  if q = @worker_queue
    q << val
    if q.size >= @worker_interval
      mq_send(q) or @nr_dropped += 1
      q.clear
    end
  else
    mq_send(val) or @nr_dropped += 1
  end
end
aggregate() click to toggle source

Loads the last shared Aggregate from the master thread/process

# File lib/raindrops/aggregate/pmq.rb, line 149
def aggregate
  @cached_aggregate ||= begin
    flush
    Marshal.load(synchronize(@rd, RDLOCK) do |rd|
      dst = StringIO.new
      dst.binmode
      IO.copy_stream(rd, dst, rd.size, 0)
      dst.string
    end)
  end
end
count() click to toggle source

proxy for Aggregate#count

# File lib/raindrops/aggregate/pmq.rb, line 213
def count; aggregate.count; end
each() { |*args| ... } click to toggle source

proxy for Aggregate#each

# File lib/raindrops/aggregate/pmq.rb, line 240
def each; aggregate.each { |*args| yield(*args) }; end
each_nonzero() { |*args| ... } click to toggle source

proxy for Aggregate#each_nonzero

# File lib/raindrops/aggregate/pmq.rb, line 243
def each_nonzero; aggregate.each_nonzero { |*args| yield(*args) }; end
flush() click to toggle source

flushes the local queue of the worker process, sending all pending data to the master. There is no need to call this explicitly as :worker_interval defines how frequently your queue will be flushed

# File lib/raindrops/aggregate/pmq.rb, line 204
def flush
  if q = @local_queue && ! q.empty?
    mq_send q
    q.clear
  end
  nil
end
flush_master() click to toggle source

Flushes the currently aggregate statistics to a temporary file. There is no need to call this explicitly as :worker_interval defines how frequently your data will be flushed for workers to read.

# File lib/raindrops/aggregate/pmq.rb, line 164
def flush_master
  dump = Marshal.dump @aggregate
  synchronize(@wr, WRLOCK) do |wr|
    wr.truncate 0
    wr.rewind
    wr.write(dump)
  end
end
master_loop() click to toggle source

Starts running a master loop, usually in a dedicated thread or process:

Thread.new { agg.master_loop }

Any worker can call agg.stop_master_loop to stop the master loop (possibly causing the thread or process to exit)

# File lib/raindrops/aggregate/pmq.rb, line 122
def master_loop
  buf = ""
  a = @aggregate
  nr = 0
  mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
  begin
    if (nr -= 1) < 0
      nr = @master_interval
      flush_master
    end
    mq.shift(buf)
    data = begin
      Marshal.load(buf) or return
    rescue ArgumentError, TypeError
      next
    end
    Array === data ? data.each { |x| a << x } : a << data
  rescue Errno::EINTR
  rescue => e
    warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
    break
  end while true
  ensure
    flush_master
end
max() click to toggle source

proxy for Aggregate#max

# File lib/raindrops/aggregate/pmq.rb, line 216
def max; aggregate.max; end
mean() click to toggle source

proxy for Aggregate#mean

# File lib/raindrops/aggregate/pmq.rb, line 225
def mean; aggregate.mean; end
min() click to toggle source

proxy for Aggregate#min

# File lib/raindrops/aggregate/pmq.rb, line 219
def min; aggregate.min; end
outliers_high() click to toggle source

proxy for Aggregate#outliers_high

# File lib/raindrops/aggregate/pmq.rb, line 234
def outliers_high; aggregate.outliers_high; end
outliers_low() click to toggle source

proxy for Aggregate#outliers_low

# File lib/raindrops/aggregate/pmq.rb, line 231
def outliers_low; aggregate.outliers_low; end
std_dev() click to toggle source

proxy for Aggregate#std_dev

# File lib/raindrops/aggregate/pmq.rb, line 228
def std_dev; aggregate.std_dev; end
stop_master_loop() click to toggle source

stops the currently running master loop, may be called from any worker thread or process

# File lib/raindrops/aggregate/pmq.rb, line 175
def stop_master_loop
  sleep 0.1 until mq_send(false)
  rescue Errno::EINTR
    retry
end
sum() click to toggle source

proxy for Aggregate#sum

# File lib/raindrops/aggregate/pmq.rb, line 222
def sum; aggregate.sum; end
to_s(*args) click to toggle source

proxy for Aggregate#to_s

# File lib/raindrops/aggregate/pmq.rb, line 237
def to_s(*args); aggregate.to_s(*args); end