One of the fundamental goals of PipelineDB is to facilitate high-performance continuous aggregation, so not suprisingly aggregates are a central component of PipelineDB’s utility. Continuous aggregates can be very powerful–in the most general sense they make it possible to keep the amount of data persisted in PipelineDB constant relative to the amount of data that has been pushed through it. This can enable sustainable and very high data throughput on modest hardware.
Continuous aggregates are incrementally updated in real time as new events are read by the the continuous view that they’re a part of. For simple aggregates such as count and sum, it is easy to see how their results can be incrementally updated–just add the new value to the existing result.
But for more complicated aggregates, such as avg, stddev, percentile_cont, etc., more advanced infrastructure is required to support efficient incremental updates, and PipelineDB handles all of that complexity for you transparently.
Below you’ll find a description of all the aggregates that PipelineDB supports. A few of them behave slightly differently than their standard counterparts in order to efficiently operate on infinite streams of data. Such aggregates have been annotated with an explanation of how exactly their behavior differs.
Bloom Filter Aggregates¶
bloom_agg ( expression )
Adds all input values to a Bloom Filter
bloom_agg ( expression , p , n )
Adds all input values to a Bloom filter and sizes it according to the given parameters. p is the desired false-positive rate, and n is the expected number of unique elements to add.
bloom_union_agg ( bloom filter )
Takes the union of all input Bloom filters, resulting in a single Bloom filter containing all of the input Bloom filters’ information.
bloom_intersection_agg ( bloom filter )
Takes the intersection of all input Bloom filters, resulting in a single Bloom filter containing only the information shared by all of the input Bloom filters.
See Bloom Filter Functions for functionality that can be used to manipulate Bloom filters.
Count-Min Sketch Aggregates¶
cmsketch_agg ( expression )
Adds all input values to a Count-Min Sketch.
cmsketch_agg ( expression, epsilon, p )
Same as above, but accepts epsilon and p as parameters for the underlying cmsketch. epsilon determines the acceptable error rate of the cmsketch, and defaults to 0.002 (0.2%). p determines the confidence, and defaults to 0.995 (99.5%). Lower epsilon and p will result in smaller cmsketch structures, and vice versa.
cmsketch_merge_agg ( count-min sketch )
Merges all input Count-min sketches into a single one containing all of the information of the input Count-min sketches.
See Count-Min Sketch Functions for functionality that can be used to manipulate Count-Min sketches.
Filtered-Space Saving Aggregates¶
fss_agg ( expression , k )
Adds all input values to a Filtered-Space Saving Top-K data structure sized for the given k, incrementing each value’s count by 1 each time it is added.
fss_agg_weighted (expression, k, weight )
Adds all input values to an FSS sized for the given k, incrementing each value’s count by the given weight each time it is added.
fss_merge_agg ( fss )
Merges all FSS inputs into a single FSS.
See Filtered-Space Saving Functions for functionality that can be used to manipulate Filtered-Space Saving objects.
hll_agg ( expression )
Adds all input values to a HyperLogLog.
hll_agg ( expression, p )
Adds all input values to a HyperLogLog with the given p. A larger p reduces the HyperLogLog’s error rate, at the expense of a larger size.
hll_union_agg ( hyperloglog )
Takes the union of all input HyperLogLogs, resulting in a single HyperLogLog that contains all of the information of the input HyperLogLogs.
See HyperLogLog Functions for functionality that can be used to manipulate HyperLogLog objects.
tdigest_agg ( expression )
Adds all input values to a T-Digest.
tidgest_merge_agg ( tdigest )
Merges all input T-Digest’s into a single one representing all of the information contained in the input T-Digests.
See T-Digest Functions for functionality that can be used to manipulate T-Digest objects.
bucket_agg ( expression , bucket_id )
Adds 4-byte hashes of each input expression to the bucket with the given bucket_id. Each hash may only be present precisely once in one bucket at any given time. Buckets can therefore be thought of as exclusive sets of hashes of the input expressions.
bucket_agg ( expression , bucket_id , timestamp )
Same as above, but allows a timestamp expression to determine bucket entry order. That is, only a value’s latest entry will cause it to change buckets.
See Miscellaneous Functions for functionality that can be used to manipulate bucket_agg objects.
exact_count_distinct ( expression )
Counts the exact number of distinct values for the given expression. Since count distinct used in continuous views implicitly uses HyperLogLog for efficiency, exact_count_distinct can be used when the small margin of error inherent to using HyperLogLog is not acceptable.
exact_count_distinct must store all unique values observed in order to determine uniqueness, so it is not recommended for use when many unique values are expected.
first_values ( n ) WITHIN GROUP (ORDER BY sort_expression)
An ordered-set aggregate that stores the first n values ordered by the provided sort expression.
See also: PipelineDB-specific Functions, which explains some of the PipelineDB’s non-aggregate functionality for manipulating Bloom filters, Count-min sketches, HyperLogLogs and T-Digests. Also, check out Probabilistic Data Structures & Algorithms for more information about what they are and how you can leverage them.
keyed_max ( key, value )
Returns the value associated with the “highest” key.
keyed_min ( key, value )
Returns the value associated with the “lowest” key.
set_agg ( expression )
Adds all input values to a set.
See Miscellaneous Functions for functionality that can be used to manipulate sets.
Since PipelineDB can incrementally update aggregate values, it has the capability to combine existing aggregates using more information than simply their current raw values. For example, combining multiple averages isn’t simply a matter of taking the average of the averages. Their weights must be taken into account.
For this type of operation, PipelineDB exposes the special combine aggregate. Its description is as follows:
combine ( aggregate column )
Given an aggregate column, combines all values into a single value as if all of the individual aggregates’ inputs were aggregated a single time.
combine only works on aggregate columns that belong to continuous views.
Let’s look at an example:
pipeline=# CREATE CONTINUOUS VIEW v AS SELECT g::integer, AVG(x::integer) FROM stream GROUP BY g; CREATE CONTINUOUS VIEW pipeline=# INSERT INTO stream (g, x) VALUES (0, 10), (0, 10), (0, 10), (0, 10), (0, 10); INSERT 0 5 pipeline=# INSERT INTO stream (g, x) VALUES (1, 20); INSERT 0 1 pipeline=# SELECT * FROM v; g | avg ---+--------------------- 0 | 10.0000000000000000 1 | 20.0000000000000000 (2 rows) pipeline=# SELECT avg(avg) FROM v; avg --------------------- 15.0000000000000000 (1 row) pipeline=# -- But that didn't take into account that the value of 10 weighs much more, pipeline=# -- because it was inserted 5 times, whereas 20 was only inserted once. pipeline=# -- combine() will take this weight into account pipeline=# pipeline=# SELECT combine(avg) FROM v; combine --------------------- 11.6666666666666667 (1 row) pipeline=# -- There we go! This is the same average we would have gotten if we ran pipeline=# -- a single average on all 6 of the above inserted values, yet we only pipeline=# -- needed two rows to do it.
In addition to PipelineDB’s built-in aggregates, user-defined aggregates also work with continuous views. User-defined combinable aggregates can be created with PostgreSQL’s CREATE AGGREGATE command. To make an aggregate combinable, a combinefunc must be given. combineinfunc and transoutfunc are optional:
CREATE AGGREGATE name ( [ argmode ] [ argname ] arg_data_type [ , ... ] ) ( ... COMBINEFUNC = combinefunc, [ , COMBINEINFUNC = combineinfunc ] [ , TRANSOUTFUNC = transoutfunc ] )
combinefunc ( stype, stype )
A function that takes two transition states and returns a single transition state. For example, here’s an example of a combine function for an integer
CREATE FUNCTION avg_combine(state integer, incoming integer) RETURNS integer AS $$ BEGIN RETURN ARRAY[state + incoming, state + incoming]; END; $$ LANGUAGE plpgsql
The transition state is represented as a 2-element array containing the number of elements and their sum, which can be used to compute a final.
combineinfunc ( any )
A function that deserializes the aggregate’s transition state from an external to internal representation. Deserialization is only necessary when the transition state type is not a native type.
transoutfunc ( stype )
A function that serializes the aggregate’s transition state from an internal to external representation that can be stored in a table cell. Serialization is only necessary when the transition state type is not a native type.
array_agg ( expression )
Input values, including nulls, concatenated into an array
avg ( expression )
The average of all input values
bit_and ( expression )
The bitwise AND of all non-null input values, or null if none
bit_or ( expression )
The bitwise OR of all non-null input values, or null if none
bool_and ( expression )
True if all input values are true, otherwise false
bool_or ( expression )
True if at least one input value is true, otherwise false
count ( * )
Number of input rows
count ( DISTINCT expression)
Number of rows for which expression is distinct.
Counting the distinct number of expressions on an infinite stream would require infinite memory, so continuous views use HyperLogLog to accomplish distinct counting in constant space and time, at the expense of a small margin of error. Empirically, PipelineDB’s implementation of HyperLogLog has an error rate of ~0.81%. For example, count distinct might show
1008when the actual number of unique expressions was
count ( expression )
Number of rows for which expression is non-null.
every ( expression )
Equivalent to bool_and
json_agg ( expression )
Aggregates values as a JSON array
json_object_agg ( key, value )
Aggregates key-value pairs as a JSON object
jsonb_agg ( expression )
Aggregates values as a JSONB array
jsonb_object_agg ( key, value )
Aggregates key-value pairs as a JSONB object
max ( expression )
Maximum value of expression across all input values
min ( expression )
Minimum value of expression across all input values
string_agg ( expression, delimiter )
Input values concatenated into a string, separated by delimiter
sum ( expression )
Sum of expression across all input values
corr ( y, x )
covar_pop ( y, x )
covar_samp ( y, x )
regr_avgx ( y, x )
Average of the independent variable
regr_avgy ( y, x )
Average of the independent variable
regr_count ( y, x )
Number of input rows in which both expressions are non-null
regr_intercept ( y, x )
y-intercept of the least-squares-fit linear equation determined by the (x, y) pairs
regr_r2 ( y, x )
Square of the correlation coefficient
regr_slope ( y, x )
Slope of the least-squares-fit linear equation determined by the (x, y) pairs
regr_sxx ( y, x )
sum(X^2) - sum(X)^2/N– sum of squares of the independent variable
regr_sxy ( y, x )
sum(X*Y) - sum(X) * sum(Y)/N– sum of products of independent times dependent variable
regr_syy ( y, x )
sum(Y^2) - sum(Y)^2/N– sum of squares of the independent variable
stddev ( expression )
Sample standard deviation of the input values
stddev_pop ( expression )
Population standard deviation of the input values
variance ( expression )
Sample variance of the input values (square of the sample standard deviation)
var_pop ( expression )
Population variance of the input values (square of the population standard deviation)
ordered-set aggregates apply ordering to their input in order to obtain their results, so they use the
WITHIN GROUP clause. Its syntax is as follows:
aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause )
Let’s look at a couple examples.
Compute the 99th percentile of value:
SELECT percentile_cont(0.99) WITHIN GROUP (ORDER BY value) FROM some_table;
Or with a continuous view:
CREATE CONTINUOUS VIEW percentile AS SELECT percentile_cont(0.99) WITHIN GROUP (ORDER BY value::float8) FROM some_stream;
percentile_cont ( fraction )
Continuous percentile: returns a value corresponding to the specified fraction in the ordering, interpolating between adjacent input items if needed
percentile_cont ( array of fractions )
Multiple continuous percentile: returns an array of results matching the shape of the fractions parameter, with each non-null element replaced by the value corresponding to that percentile
Computing percentiles on infinite streams would require infinite memory, so both forms of percentile_cont, when used by continuous views, use T-Digest as a way to estimate percentiles with a very high degree of accuracy. In general, percentiles in continuous views are more accurate the closer they are to the upper or lower bounds of
hypothetical-set aggregates take an expression and compute something about it within the context of a set of input rows. For example, rank(2) computes the
2 within the context of whatever the input rows end up being.
The hypothetical-set aggregates use the
WITHIN GROUP clause to define the input rows. Its syntax is as follows:
aggregate_name ( [ expression [ , ... ] ] ) WITHIN GROUP ( order_by_clause )
Here is an example of of a hypothetical-set aggregate being used by a continuous view:
CREATE CONTINUOUS VIEW continuous_rank AS SELECT rank(42) WITHIN GROUP (ORDER BY value::float8) FROM some_stream;
This continuous view will continuously update the rank of
42 given all of the events it has read.
rank ( arguments )
Rank of the hypothetical row, with gaps for duplicate rows
dense_rank ( arguments )
Rank of the hypothetical row, without gaps
Computing the hypothetical dense_rank of a value given an infinite stream of values would require infinite memory, so continuous views use HyperLogLog to do it in constant time and space, at the expense of a small margin of error. Empirically, PipelineDB’s implementation of HyperLogLog has an error rate of ~0.2%. In other words, dense_rank (1000) in a continuous view might show 998 when the actual number of unique lower-ranking values seen was
percent_rank ( arguments )
Relative rank of the hypothetical row, ranging from 0 to 1
cume_dist ( arguments )
Relative rank of the hypothetical row, ranging from 1/N to 1
mode ( )
Future releases of PipelineDB will include an implementation of an online mode estimation algorithm, but for now it’s not supported
percentile_disc ( arguments )
Given an input percentile (such as 0.99), percentile_disc returns the very first value in the input set that is within that percentile. This requires actually sorting the input set, which is obviously impractical on an infinite stream, and doesn’t even allow for a highly accurate estimation algorithm such as the one we use for percentile_cont.
xmlagg ( xml )
aggregate_name (DISTINCT expression)
countaggregate function is supported with a
DISTINCTexpression as noted above in the General Aggregates section. In future releases, we might leverage Bloom Filter to allow
DISTINCTexpressions for all aggregate functions.