AI News HubLIVE
In-site rewrite5 min read

Rolling Aggregations for Real-Time AI

Rolling aggregations compute statistics over a continuously shifting window of data, enabling real-time behavioral change detection and anomaly detection for AI systems. This article explores three approaches to efficiently compute rolling aggregations: tiled window aggregations, incremental views (Feldera), and parallel pushdown aggregations in RonDB, comparing their computational complexity and latency. Hopsworks offers both shift-left (precompute) and shift-right (on-demand) options to suit different use cases.

SourceHacker News AIAuthor: LexSiga

All posts

TL;DR

Rolling aggregations are among the most useful input data for AI systems, enabling behavioral change detection and anomaly detection in real-time. They capture recent trends and patterns in a compressed representation, enabling interactive AI systems to meet the lowest latency requirements for feature freshness. Rolling aggregations are used by both real-time ML systems (e.g., credit card fraud and personalized recommendations) and interactive agents (e.g., compressed user history/behavior or summary of recent activity).

Rolling Aggregations - the Queen of AI Aggregations

A rolling aggregation computes statistics over a continuously shifting window of data. You aggregate over the last N values or time period at each point.

Figure 1. A rolling aggregation computes an aggregation function (e.g., SUM, AVG, MIN, MAX, STDDEV, etc) over a window size of time-series data (e.g. last hour).

Traditional windowed aggregations in stream processing (e.g., Apache Flink) are not designed to compute rolling aggregations, as they are computationally expensive - for every new event, you have to recompute the aggregate over all N events in the bucket. Instead, they use tumbling or hopping (sliding) windows to group data into windows of time, such as the last 10 minutes or hour, and compute aggregations over those windows.

Figure 2. Tumbling windows and hopping windows introduce a delay between when an event arrives and when an aggregation result is computed. Rolling aggregations are computed immediately when the event arrives.

Figure 2 shows how tumbling windows only output aggregations after the window length and watermark has passed, while hopping windows wait until the hop size has passed. While tumbling and hopping windows make computing aggregations over streaming data computationally tractable, they introduce latency. The output aggregations are as stale as the window length + watermark or the hop size.  In AI terms, the feature data they output is stale. Stale data means your interactive AI applications will not be intelligent, just laggy. In contrast, rolling aggregations output results immediately when a new event arrives, producing fresh feature data.

Figure 3. Brief history of increasing feature freshness for rolling aggregations.

In Figure 3, you can see a brief history of the journey from tumbling/hopping windows to solutions for computing rolling aggregations. The first adopted approach was tiled window aggregations that combined stream processing with further computation at request-time. A lower cost solution was developed by Feldera recently based on incremental views for streaming processing. And recently, with RonDB, we developed native database support for parallel processing of aggregations - avoiding the need for stream processing. We now describe these approaches.

Shift Left and Shift Right with Tiled Window Aggregations and Chronon

AirBnB’s Chronon framework provided the first novel solution to reduce the computational overhead of computing rolling aggregations with an approach called tiled time window aggregations. Tecton Rift (based on DuckDB) and Chalk.ai (based on Apache Velox) also provide variants on this solution for scalable rolling aggregations.

Say you want to compute a precise 240 hour aggregation, you could decompose the events into 24-hour tiles, computed daily at 12am. The idea is that you can compute the 240 hour aggregation by composing partial aggregates for the 24-hour tiles. This works trivially for some aggregations (e.g., min, max), but requires maintaining additional state for others (e.g., mean).

Now imagine, a request arrives to compute a rolling aggregation at 1pm. Your tiles are only from 12am to 11.59pm. You will not have yet computed a tile for the current day’s events (from 12am-1pm) and you won’t have a tile for the events from 1pm of the last day in the interval (the tile for that day contains events not included in the interval). These events that lie outside the tiles are called head and tail events, respectively. Tiled window aggregations are computed by composing the partial aggregates with on-demand aggregations over the unaligned head/tail events.

Figure 4. Tiled aggregation combines precomputed partial aggregates (tiles) with on-demand computation to compose aggregations from both partial aggregates and recent events (head/tail events that are not included in a tile).

More specifically, tiles partition a window of length N into M tiles, where M= '2024-07-01' (~50% selectivity)

padding: Simulates realistic row width (~130 bytes per row)

Q1: Simple Aggregation

SELECT COUNT(*), SUM(val1), SUM(val3), COUNT(val3) FROM table

Q2: Multi-Aggregate (6 aggregations)

At 10K rows, PA continues to perform well, but other engines would  transfer all rows to compute 6 aggregates, while PA computes them at the data node in a single pass.

SELECT COUNT(*), SUM(val1), SUM(val2), SUM(val3), COUNT(val3), MIN(val4), MAX(val4) FROM table

Q3: Expression Aggregation

Arithmetic expressions (val1 * val3, val3 + val4) are computed directly on data nodes. This is an example of a large table where scan I/O dominates over computation.

SELECT COUNT(*), SUM(val1 * val3), SUM(val3 + val4), COUNT(val3 + val4) FROM table

Q4: Filtered Aggregation (Index Scan)

SELECT COUNT(*), SUM(val1), SUM(val3), COUNT(val3) FROM table WHERE filter_date >= '2024-07-01'

Benchmark Results for Q1-Q4

Table 1 below shows experimental performance for our benchmark with Q1-Q4, while varying the number of rows each aggregation is computed over. For each query and number of rows per aggregation, we measure throughput in queries per second (QPS) and latency.

Query100 QPS100 Latency (ms)500 QPS500 Latency (ms)10000 QPS10000 Latency (ms)100000 QPS100000 Latency (ms)500000 QPS500000 Latency (ms)

Q1 Simple Aggregation388130.9245281.5227417.3177217.845.1658

Q2 Multi Aggregation406230.8232281.6205718.9148267.632.21212

Q3 Expression Aggregation409490.8230731.6211618.6142273.431.11266

Q4 Filtered Aggregation315240.2262050.345401.941819.458.3152.6

Table 1. Benchmark results for Queries 1-4, varying the number of rows aggregations are computed over, and measuring latency and throughput in queries per second (QPS).

From the results with only 2 data nodes (RonDB can scale up to 144 data nodes), you can see that the latency for Q1-Q3 increases when the number of rows for an aggregation reaches 10,000. For Q4, however, latency stays very low (1.9 milliseconds), as this query includes an index scan that pushes both range filtering and aggregation to the data nodes, eliminating row transfer entirely, and keeping latency low. The implication is that if you have aggregations that also include filters, you can scale pushdown aggregations to much higher performance on the same hardware.

Comparative Guide for Rolling Aggregation

Table 2 provides an overview of the architectural design (shift-left or shift-right) and computational complexity of the technologies presented here.

Shift Left (Streaming)Shift Right (Inference Time)

Apache FlinkO(N) bucket size-

Tiled WindowsO(N/t) bucket size, where t is the number of tilesO(t) + O(m), where t is the number of tiles and m is the number of head and tail events

FelderaO(1) bucket size-

Hopsworks with RonDB Pushdown Aggregations-O(N/r) compute cost with r database node threads and O(1) network I/O from the DB.

Client-Side Aggregations-O(N/r) with r client-side threads and O(N) network I/O from the DB

Table 2. Different technologies for computing rolling aggregations. Shift-left precomputes rolling aggregates. Shift-right computes them on-demand. The computational complexity of each technology is also shown.

From the Table, it is clear that Feldera is the winner for shift-left rolling aggregations, and RonDB outperforms naive on-demand computation of aggregates, by moving aggregate computation into the database, reducing network transfer from the database.

Shift Left or Shift Right Rolling Aggregates in Hopsworks

Hopsworks is an open compute platform, where you can run your pipelines either inside or outside Hopsworks. Depending on your requirements, you can choose either shift-left or shift-right rolling aggregations.

For shift-left, Hopsworks has first class integration with Feldera, but you also bring your own Apache Flink platform if you prefer, or even Spark Streaming. Streaming pipelines compute rolling aggregates on  event data and store the aggregates in Hopsworks for serving in RonDB at sub-ms latency. Hopsworks also materializes the aggregates to its offline store (lakehouse tables), for later use as training data or historical context data for agents.

For shift-right, you define rolling aggregations as SQL transformations that are applied consistently in training and inference pipelines without skew. In training pipelines SQL transformations are applied by Hopsworks Query Service or SparkSQL, depending on whether it is a Python program or Spark program,  respectively. If you do not want the complexity of a stream processing engine, you should use RonDB if it scales to meet your latency and throughput requirements.

Summary

Rolling aggregations are key to capturing behavior and anomaly signals fresh enough to build truly interactive AI systems. The first scalable solution to computing rolling aggregations, tiled-window aggregations, has been superseded by incremental views. And now, with parallel pushdown aggregations, larger data volumes can be computed on-demand.

Hopsworks supports both shift-left rolling aggregations, with Feldera’s efficient incremental views (O(1) updates), and shift-right with RonDB’s parallel pushdown aggregations. Hopsworks give teams the flexibility to choose from simplest the operations (RonDB) to the largest scale with sub-millisecond freshness (Feldera).