PipelineDB Cluster is designed to supercharge the PipelineDB core with maximum scale and performance, without fundamentally changing any client interfaces. PipelineDB Cluster’s clustering engine is a a core component of how this is achieved.
While the PipelineDB core is very good at scaling work across CPUs, clustering takes parallelization further by making it possible to seamlessly scale work across multiple hardware servers that appear to be a single, logical PipelineDB server. Here you’ll find information about how all of this works internally.
PipelineDB Cluster relies heavily on the PipelineDB core by extending it. Once the PipelineDB Cluster extension has been installed on a group of PipelineDB servers, those servers can easily be formed into a cluster (Deployment).
PipelineDB Cluster’s clustering engine uses a shared-nothing architecture that keeps metadata in sync using Two Phase Commits. Any node can accept any read or write request, although we recommend putting a reverse proxy in front of the cluster for simplicity. See Load Balancing for more information.
PipelineDB Cluster’s clustering engine aggressively takes advantage of two important properties of PipelineDB:
PipelineDB is used as a high-throughput streaming analytics engine, not as a distributed database for storing granular data.
What this has shown us is that most datasets stored in the database are relatively small because they are continuously distilled from incoming data. As a result, PipelineDB Cluster focuses primarily on distributing computation and less on distributing data. The following property clarifies why this approach is effective.
All PipelineDB aggregates can be incrementally updated
Empirically nearly all continuous views are defined by some kind of aggregate query, whether it be simple or highly complex. Continuous aggregation requires that existing aggregates can be incrementally updated with new information. Taking this a step further, PipelineDB supports combining aggregates with other aggregates as if the final output came from all original inputs.
PipelineDB Cluster’s clustering engine takes advantage of this by distributing continuous views across multiple nodes that each get some subset of the incoming data. These “partial” continuous views are then combined as necessary at read-time with no loss of information, resulting in an architecture that requires almost no sharing of information between nodes.
PipelineDB Cluster only distributes continuous views across nodes. All non-continuous view DML and DDL queries are simply replicated to all nodes in the cluster using Two Phase Commits.
Since any PipelineDB node can accept any read or write request, it is often necessary to route rows between nodes before combining them with on-disk data. However, any routing that is done does not happen until after a continuous view’s query plan is executed on any available incoming rows first. What this means is that only the aggregate result of incoming rows actually needs to be routed. This is designed to both minimize network overhead and distribute work.
Since continuous views are nearly always defined by aggregate queries with groupings, and since groupings are by definition unique, groupings are the basis for determining how rows are ultimately routed between nodes. PipelineDB Cluster supports two sharding policies: read_optimized and write_optimized, which will be explained in the following sections.
Sharding policies may be set globally using the
pipeline_cluster.sharding_policy configuration parameter in
or on a per-continuous view basis using the sharding_policy parameter:
CREATE CONTINUOUS VIEW v0 WITH (sharding_policy='read_optimized') AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x; CREATE CONTINUOUS VIEW v1 WITH (sharding_policy='write_optimized') AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x;
Read-optimized continuous views are designed to minimize the size of underlying on-disk data, thus making reads as fast as possible. Minimizing on-disk data is accomplished by only keeping disjoint subsets of continuous view groups on each shard. That is, after initial partial aggregation, resulting groups will be routed to the node that is assigned that group. Each group only exists on one node.
Consider the following simple example:
CREATE CONTINUOUS VIEW v WITH (sharding_policy='read_optimized', shards=2) AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x;
Now let’s assume that the following inserts are processed by a node other than the node that is assigned the resulting grouping:
INSERT INTO stream (x) VALUES (0), (0), (0), (0), (0);
Assuming that the worker process reads these five rows all at once fast enough, only the aggregate row
(0, 5) would be
routed to the grouping’s designated node, and subsequently combined with on-disk data as usual.
Read-optimized grouping is PipelineDB Cluster’s default grouping policy
Write-optimized continuous views are designed to maximize write throughput by allowing any shard to maintain a local version of any group. Routing is then only necessary if a node that produces a partial aggregate result for a given group does not have any shards for the group’s continuous view.
Since a group may exist on multiple servers with write-optimized grouping, groups must be combined into a single group at read-time, thereby incurring a read-time performance impact in exchange for increased write throughput. How groups are combined at read-time is explained in the next section.
Since continuous view shards are independent of each other, it may be necessary to combine data from multiple shards at continuous view read time in order to provide a finalized query result. This is a fundamental design property that takes advantage of the fact that continuous views are typically already aggregated, and therefore relatively cheap to read. Consider the following example continuous view:
CREATE CONTINUOUS VIEW v WITH (sharding_policy='write_optimized', shards=16) AS SELECT x::integer, AVG(y::integer) FROM stream GROUP BY x;
Since it uses a write_optimized grouping policy, multiple local copies for each grouped average may exist. At read time, these groups would be combined with no loss of information, producing a finalized result with exactly one row per group.