Thursday, March 3, 2011

Bandwidth estimator using a simple Kalman filter, Part 2 of 2

I realized that I didn't run through the simple Kalman filter algorithm in the last posting. So, let's do that now before the promised source code review. The Kalman filter I used is about as simple as it can get:

Weight calculation:
weight = bandwidth_variance / (bandwidth_variance + measurement_variance)

The bandwidth variance (the variability within any single measurement) and measurement variance are used to compute the weight.


Estimate calculation:
estimate = estimate + weight*(bandwidth_measurement - estimate)



A higher measurement variance will mean the weight will approach zero. Which when applied to the estimate means that we more rely on our estimate more than the measurement. Which makes sense. Essentially, the weight adjusts based on how close the estimate was to the actual measurement. The measurement variance is an estimate of the variability of the measurement and as such is a bit of a fudge factor. The measurement variability depends on a number of factors, some of which are tied to the length of the pipe (or the number of hops away from the host). This can certainly be quantified and would result in a more accurate computed value for the bandwidth estimate (a todo item).

Okay, now let's look at the source code.

Inside the main routine:

The snippet below shows the set up of the test object and the forever while loop that performs the measurement and displays the results. The processing module is set via a class template on the BWTest object (And can be any one of the derived implementations: Kalman, Average, None or All). The interface for BWTest is very simple. BWTest basically performs the bandwidth test and then dumps out the current processed value of the bandwidth of the pipe you are measuring. Internally, the status dump is delegated to the Processing module (this is the templated object).

BWBase *bwtest = NULL;
if (Kalman::name() == module) {
  bwtest = new BWTest(target,small,large);
}
else if (Average::name() == string(module)) {
  bwtest = new BWTest(target,small,large);
}
else if (All::name() == string(module)) {
  bwtest = new BWTest(target,small,large);
}
else {
  bwtest = new BWTest(target,small,large);
}

//iterate through test loop
while (bwtest != NULL && bwtest->next()) {
  bwtest->print();
  sleep(interval);
}


Inside the BWTest two tests performed. One for the large package and the second for the small packet. The large test is dispatched first so that if there are any initial slow->fast path algorithms on the transiting router these will be performed on the slower packet rather than the smaller packet (i.e. update fast-pass cache with the first test). If anything performing the test in this way will have a tendency to overestimate the bandwidth.

bool
next()
{
  unsigned short length;
  int short_lat = 0, long_lat = 0;
  _probe.send(_target,_id, _pkt_size_large);
  _probe.receive(_id,long_lat,length);

  _probe.send(_target,_id, _pkt_size_small);
  _probe.receive(_id,short_lat,length);

  ++_id;

  update(_pkt_size_small,_pkt_size_large,short_lat,long_lat);
  return true;
}

The packet is then dispatched to the update() method, which performs the instantaneous bandwidth estimate and subsequently passes down to the processor. It's important to note that no history is stored in the BWTest object--this is the responsibility of the processor class.

Note that the variable _id is not really used in this implementation. The Probe class is designed to perform concurrent measurements as long at the calling class keeps track of outstanding _id values. In this implementation it's sufficient to simply perform the tests sequentially.

You can get an idea of the object layout below. As you saw in the source snippet above, the BWTest is templatized on the Processor class. The Processor class is an abstract class that has 4 implemented versions: All, None, Average and Kalman. The public methods (all that are shown below) are displayed in the diagram.



All processors derive from this abstract base class. Another simple class where the external interface supports pushing of values into the Processor and printing (::Print()), or retrieving (::get_cur()) of the current state of the test.

class Processor
{
protected:
  Processor() {}
  virtual ~Processor() {}

  virtual void
  push(float x, float y) = 0;

  virtual void
  print() = 0;

  virtual float
  get_cur() = 0;
};

The Kalman implementation snippet is below (remember Kalman as are None, All and Average are child classes of Processor). This is where the estimation is performed. As you can see the Kalman maintains history of the measurement and can print out its status or return the current value via exposed accessors.

void
Kalman::push(float x_meas, float y_meas)
{
  if (y_meas <= 0.) {
    _failed = true;
    return;
  }

  _coll[_ct++ % K_ARRAY_SIZE] = y_meas;

  if (_start == true) {
    _start = false;
    _estimate = y_meas;
  }
  /*
    noise:
    _estimate;  LAST MEASUREMENT
    _weight;    COMPUTED FROM HISTORY OF MEASUREMENTS
  */

  float bandwidth_var = variance(_coll);
  if (bandwidth_var < 0.) {
    _failed = true;
    return;
  }
 
  //todo:: improvement: replace this with computed value.
  ArrayColl coll;
  for (int i = 0; i < K_ARRAY_SIZE; ++i) {
    coll[i] = (y_meas * _measurement_range) + y_meas * float(rand()) / 
      float(RAND_MAX);
  }
  float measurement_var = variance(coll);

  _failed = false;
  _weight = bandwidth_var / ( bandwidth_var + measurement_var);
}



So, that's it pretty much in the proverbial nutshell. Download the tarball for more fun.

No comments:

Post a Comment