Warning: Trying to access array offset on value of type bool in /home/www/blog/wp-content/themes/catch-box/functions.php on line 1079

Sliding Window and Adaptive Counting

UniqueUser

This is what I wanted to have. A line chart of unique users for the last couple of minutes and one value of the unique users over the last hour. Usually we need to algorithms for that:

  • Adaptive Counting
  • Sliding Window

Adaptive Counting is one of the streaming algorithms that allows us to estimate distinct elements in a stream with high performance and little need of memory. It is based on the adaptive counting approach of: Fast and Accurate Traffic Matrix Measurement Using Adaptive Cardinality Counting by: Cai, Pan, Kwok, and Hwang. I am using this implementation here. Especially important is that it is possible to merge instances of AdaptiveCounting. We need that for calculation of a cardinality out of a set of “windowed” AdaptiveCounting instances.

Sliding Window is a kind of ring buffer holding instances of a certain counting object. Items are counted in one instance and at a certain point in time, the window gets advanced to the next counting instance. Since the oldest counter is replaced, the window has moved to a new position.

I’m showing here an example implementation (yes I know, there is room for improvement).
First the usage (f.e. in a Vertx Verticle):

class UniqueUserVerticle extends Verticle {
  val uniqueUserCounter = new SlidingWindowCounter[UniqueUserCounter](10)

    ...
  // somewhere where I can count things. 
  // F.e. in a message handler
  uniqueUserCounter.getCountable.offer(ipNumber)
    ...

    ...
  // advance window every 6 minutes
  vertx.setPeriodic(60000*6, { timerID: Long =>
      uniqueUserCounter.advanceWindow()
  }

    ...
  // send both values every 5 seconds via Websocket
  vertx.setPeriodic(5000, { timerID: Long =>
    val sumValue  = uniqueUserCounter.getSumOfCountables
    val allValues = uniqueUserCounter.getAllCountables
      ... // publish to websocket
  }

}

So we have a window size (instance count) of 10 here. Advanced every 6 minutes, implies a 1 hour window. allValues is an Array of instances which we can use to draw the line chart above. The sumValue is a merged instance out of 10. So this is the overall unique user count, counted over a one hour time frame.

The class SlidingWindowCounter may look like this:

trait Countable {
  def addThisToThatAndReturnThat(that: Countable): Countable
}

class SlidingWindowCounter[T 
        val result =
          if (b != null)
            b.addThisToThatAndReturnThat(that)
          else
            that
        result.asInstanceOf[T]
    }
    sumInstance
  }

  def getSumOfCountablesThenAdvanceWindow: T = {
    val sumInstance = getSumOfCountables
    slots(tailSlot) = createNewInstance
    advanceHead()
    sumInstance
  }

  def advanceWindow(): Unit = {
    slots(tailSlot) = createNewInstance
    advanceHead()
  }

  private def createNewInstance = classTag[T].runtimeClass.newInstance.asInstanceOf[T]

  private def slotAfter(slot: Int): Int = (slot + 1) % windowLengthInSlots

  private def advanceHead(): Unit = {
    headSlot = tailSlot
    tailSlot = slotAfter(tailSlot)
  }
}

The counting class UniqueUserCounter:

class UniqueUserCounter(val started: Long, val adaptiveCounting: AdaptiveCounting) extends Countable {
  def this() = this(System.currentTimeMillis(), new AdaptiveCounting(16))

  private def this(adaptiveCounting: AdaptiveCounting) = this(System.currentTimeMillis(), adaptiveCounting)

  def offer(item: AnyRef) = adaptiveCounting.offer(item)

  def getCardinality = adaptiveCounting.cardinality()

  override def addThisToThatAndReturnThat(that: Countable): Countable = {
    val thatOne = that.asInstanceOf[UniqueUserCounter]
    val ac = new AdaptiveCounting(thatOne.adaptiveCounting.merge(this.adaptiveCounting).asInstanceOf[AdaptiveCounting])
    new UniqueUserCounter(ac)
  }
}