In practice:
trait CountingSketcher[S] {
def emptySketch: S
def update(a: S, s: String): S
def union(a: S, b: S): S
def getEstimate(a: S): Double
def serialize(a: S): Array[Byte]
def deserialize(xs: Array[Byte]): S
}
In order to build a sketching algorithm for unique counts:
Daily aggregation: take web browsing events, aggregate and store them.
(timestamp, user id, country, device type, url tokens)
(date, country, url tokens, sketch of user ids)
Further, on-demand aggregation: given a keyword such as laptop
,
url tokens
approx_count_distinct()
methods in many databases: Postgres, Apache Spark, Redis...case class Theta(theta: Double, hashes: Set[Long])
Theta
acts as an instance of S
.
hashes
stores a few of the smaller hash values.theta
starts at 1.0
and lowers as we keep adding values to the sketch. It acts as a threshold for data not added to hashes
.class Sketcher extends CountingSketch[Theta] {
val emptySketch = Theta(1.0, Set.empty[Long])
def update(a: Theta, s: String): Theta = {
// A manipulation on the hash bits of s that decides whether:
// 1. The value of a.theta needs to be lowered.
// 2. A new element needs to be added to a.hashes.
// 3. Hashes below a.theta are discarded.
}
def union(a: Theta, b: Theta): Theta = {
val th = min(a.theta, b.theta)
val hs = (a.hashes ++ b.hashes).filter(_ < th)
Theta(th, hs)
}
def getEstimate(a: Theta): Double = a.hashes.size.toDouble / a.theta
}
Spark provides a base class org.apache.spark.sql.expressions.Aggregator
that, for practical purposes, looks as follows:
import org.apache.spark.sql.{Encoder, Encoders}
abstract class Aggregator[-IN, BUF, OUT] {
def zero: BUF
def reduce(buffer: BUF, value: IN): BUF
def merge(b1: BUF, b2: BUF): BUF
def finish(reduction: BUF): OUT
// Will leave these out of the discussion
def bufferEncoder: Encoder[BUF]
def outputEncoder: Encoder[OUT]
}
Now, we can define an aggregator for values into a column of BinaryType
using:
import org.apache.spark.sql.expressions.Aggregator
val sk = new Sketcher
object SketchPreaggregator
extends Aggregator[String, Theta, Array[Byte]] {
def zero: Theta = sk.emptySketch
def reduce(b: Theta, a: String): Theta = sk.update(b, a)
def merge(b1: Theta, b2: Theta): Theta = sk.union(b1, b2)
def finish(reduction: Theta): Array[Byte] = sk.serialize(reduction)
}
Next, we can further aggregate a column of sketches via:
object SketchAggregator
extends Aggregator[Array[Byte], Theta, Array[Byte]] {
def zero: Theta = sk.emptySketch
def reduce(b: Theta, a: Array[Byte]): Theta = sk.union(b, sk.deserialize(a))
def merge(b1: Theta, b2: Theta): Theta = sk.union(b1, b2)
def finish(reduction: Theta): Array[Byte] = sk.serialize(reduction)
}
Finally, we can register some UDFs and UDAFs based on the above and use them on DataFrames.
import org.apache.spark.scala.sql.{udaf, udf}
val stringsToSketch = udaf(SketchPreaggregator)
val aggSketches = udaf(SketchAggregator)
val getEstimate = udf(sk.getEstimate(sk.deserialize(_)))
val urlSketches = events
.groupBy(to_date($"ts") as "dt", $"country", $"url_tokens")
.agg(stringsToSketch($"user_id") as "sketch")
val estimates = urlSketches
.filter($"url_tokens" like "%laptop%")
.agg(aggSketches($"sketch") as "sketch")
.withColumn("unique_users", getEstimate($"sketch"))
Previous approach:
Current development:
The goal is to report the number of users in common for each pair of segments defined on our Platform, on a daily basis.
A few links: