This is what I wanted to have. A line chart of unique users for the last couple of minutes and one value of the unique users over the last hour. Usually we need to algorithms for that:
- Adaptive Counting
- Sliding Window
Adaptive Counting is one of the streaming algorithms that allows us to estimate distinct elements in a stream with high performance and little need of memory. It is based on the adaptive counting approach of: Fast and Accurate Traffic Matrix Measurement Using Adaptive Cardinality Counting by: Cai, Pan, Kwok, and Hwang. I am using this implementation here. Especially important is that it is possible to merge instances of AdaptiveCounting. We need that for calculation of a cardinality out of a set of “windowed” AdaptiveCounting instances.
Sliding Window is a kind of ring buffer holding instances of a certain counting object. Items are counted in one instance and at a certain point in time, the window gets advanced to the next counting instance. Since the oldest counter is replaced, the window has moved to a new position.
I’m showing here an example implementation (yes I know, there is room for improvement).
First the usage (f.e. in a Vertx Verticle):
class UniqueUserVerticle extends Verticle { val uniqueUserCounter = new SlidingWindowCounter[UniqueUserCounter](10) ... // somewhere where I can count things. // F.e. in a message handler uniqueUserCounter.getCountable.offer(ipNumber) ... ... // advance window every 6 minutes vertx.setPeriodic(60000*6, { timerID: Long => uniqueUserCounter.advanceWindow() } ... // send both values every 5 seconds via Websocket vertx.setPeriodic(5000, { timerID: Long => val sumValue = uniqueUserCounter.getSumOfCountables val allValues = uniqueUserCounter.getAllCountables ... // publish to websocket } }
So we have a window size (instance count) of 10 here. Advanced every 6 minutes, implies a 1 hour window. allValues is an Array of instances which we can use to draw the line chart above. The sumValue is a merged instance out of 10. So this is the overall unique user count, counted over a one hour time frame.
The class SlidingWindowCounter may look like this:
trait Countable { def addThisToThatAndReturnThat(that: Countable): Countable } class SlidingWindowCounter[T val result = if (b != null) b.addThisToThatAndReturnThat(that) else that result.asInstanceOf[T] } sumInstance } def getSumOfCountablesThenAdvanceWindow: T = { val sumInstance = getSumOfCountables slots(tailSlot) = createNewInstance advanceHead() sumInstance } def advanceWindow(): Unit = { slots(tailSlot) = createNewInstance advanceHead() } private def createNewInstance = classTag[T].runtimeClass.newInstance.asInstanceOf[T] private def slotAfter(slot: Int): Int = (slot + 1) % windowLengthInSlots private def advanceHead(): Unit = { headSlot = tailSlot tailSlot = slotAfter(tailSlot) } }
The counting class UniqueUserCounter:
class UniqueUserCounter(val started: Long, val adaptiveCounting: AdaptiveCounting) extends Countable { def this() = this(System.currentTimeMillis(), new AdaptiveCounting(16)) private def this(adaptiveCounting: AdaptiveCounting) = this(System.currentTimeMillis(), adaptiveCounting) def offer(item: AnyRef) = adaptiveCounting.offer(item) def getCardinality = adaptiveCounting.cardinality() override def addThisToThatAndReturnThat(that: Countable): Countable = { val thatOne = that.asInstanceOf[UniqueUserCounter] val ac = new AdaptiveCounting(thatOne.adaptiveCounting.merge(this.adaptiveCounting).asInstanceOf[AdaptiveCounting]) new UniqueUserCounter(ac) } }