How we Scaled SigView to Beyond 10 Billion Daily Events
At Sigmoid, our aim is to help our customers analyze large volumes of data in near real-time. When we started working in advertising technology, we realized our platform needed to scale to process and visualize tens of billions of events per day. Last month during the holiday season we successfully scaled our platform to scale beyond 8 billion events per day at peak.
Holiday season is a frenzy session for digital marketing companies as the digital spends are at their peak. Analysts and campaign managers want to be able to analyze the data in real time so that they don’t end up missing any opportunity. For one of our customers who run a video exchange, the team wanted to divert more inventory to better performing advertisers or to monitor advertiser spend and understand any dips and anomalies. But to be able to make the most of the information, they needed it reliably in real time.
Keeping these requirements in mind, we wanted to create an analytics platform which could easily scale to tens of billions of auctions per day while enabling real time querying on hundreds of billions of auctions per month. Our first checkpoint on this was to test
- Ingestion pipeline to support peaks of ~10 B auctions per day
- Real time querying on ~200 B auctions per month.
In this post we would focus briefly on both these separate yet important goals.
Data Ingestion has four different aspects to it, which need to be addressed in developing a pipeline to handle these large volumes:
- Latency: How soon is the data visible for analysis?
- Complexity: What transformations*, joins and aggregations can be applied on the data?
- Reliability: Is there is any loss in data while it moves through the pipeline?
- Scalability: Is the system fully scale-out? Can increase in volumes be handled by adding more machines?
*Attribution is typically considered to be a difficult problem to solve at scale as often attribution data is delayed and records which are few hours old need to be updated.
Sigmoid’s SAAS product was built to connect to various data sources such as Amazon S3, Kafka, etc at user-defined, periodic intervals and uses Apache Spark for all its computation and transformations. Our ability to elastically scale and keep the dashboard within strict SLA comes from our battle tested ETL engine written on top of Apache Spark.
Apache Spark coupled with elastic cloud setup allowed us to dynamically scale our ETL pipelines to meet the ever dynamic scale of advertizing world, scaling from 0.7B to 7B auctions within minutes.
The data in this case was shared by S3 buckets and ingested hourly into our platform within the next hour. The average file size of every hour varied between ~200 to ~600 GB and the entire processing, transformation had to be completed within minutes.
The critical step in the data ingestion step in our platform is the Data preparation stage, which ensures data goes through all transformations and also the data is ready for fast querying. The data ingested into the system is converted into columnar format by a step called as Wrangling. This step is also responsible for running data validations, maintaining data schema and complex user defined ETL pipelines (for example: attribution pipelines).
For querying on such high volumes we use our Apache Spark based Analytical engine called as NitroDB. NitroDB was built to query and analyze data at the scale of 100s of terabytes in seconds using commodity cloud hardware. Since disk I/O speeds are usually the bottleneck, we kept a simple principle in mind while building the analytical engine: “Touch as little data as possible”. This was enabled in two steps:
1. Intelligent Data Storage
As mentioned in the previous part, data is stored in a columnar format in HDFS. Columnar format allows us not to read columns that are not required; lesser data means lesser time spent in I/O reads. A typical RTB record has about 60-65 columns, however a typical analysis can be broken down to a GROUPBY over a particular column, WHERE clause on another column and AGGREGATE over another column. Hence by following a columnar storage we need to only touch 3-4 columns instead of the full set of 65 columns.
2. Intelligent Data Processing
Internally we run an indexer step after the wrangling step to generate a mix of materialized views and primary indexes on the ingested data and store them as segments distributed across the cluster. We create materialized views for frequently accessed data and indexes to allow fast searches, sequential access, insertions, and deletions on the entire data set.
For instance, in this particular case, query pattern analysis concluded that the most important metrics in December were ‘Bid Count’ and ‘Advertiser Spend’ over the dimensions related to advertiser and location.
Automated projections were generated based on the analysis of query patterns, created materialized views on these so that the most frequent queries could be returned within seconds over a range of time periods. This was later shared with our customer who could appreciate the improvement in these queries.
In another example, since click data is often delayed by a few hours and in some cases even days, an indexed view helps us to easily locate the particular record and change it without going through the entire data set of hundreds of Billions of rows. This helps in fast ingestion and updation of records.
Only the index segments / views that are required by a query are targeted thus reducing the I/O load considerably. Moreover, since fewer CPU cores are required to process each query, more queries can run in parallel for the same amount of computing resources.
Like our rest of the data pipeline, our query engine is also built on top of Apache Spark. Query engine translates the queries into equivalent Spark Dataframe Query, which is then executed on the Spark Cluster. The data manager layer also determines which projections and indexes to touch based on the query. In certain cases, multiple projection plans can be generated from a single query. In these cases, Query Engine supports sampling of results in order to help Query Planner decide on a better set of projections. Once the query is executed on the cluster, it hands over the result to the Application Server for visualization.
By utilizing some intelligent data manipulation techniques, we were not only able to scale the platform to beyond 10 Billion data points daily but to also give a competitive cost advantage to our customers by utilizing commodity AWS machines for both compute and data storage. We have been able to give an almost 50% cost advantage to our partners over traditional solutions but more importantly offered a more reliable system where both the data and the lag in the data is reliable.