Clustering

PipelineDB Cluster is designed to supercharge the PipelineDB core with maximum scale and performance, without fundamentally changing any client interfaces. PipelineDB Cluster’s clustering engine is a a core component of how this is achieved.

While the PipelineDB core is very good at scaling work across CPUs, clustering takes parallelization further by making it possible to seamlessly scale work across multiple hardware servers that appear to be a single, logical PipelineDB server. Here you’ll find information about how all of this works internally.

Architecture Overview

PipelineDB Cluster relies heavily on the PipelineDB core by extending it. Once the PipelineDB Cluster extension has been installed on a group of PipelineDB servers, those servers can easily be formed into a cluster (Deployment).

PipelineDB Cluster’s clustering engine uses a shared-nothing architecture that keeps metadata in sync using Two Phase Commits. Any node can accept any read or write request, although we recommend putting a reverse proxy in front of the cluster for simplicity. See Load Balancing for more information.

PipelineDB Cluster’s clustering engine aggressively takes advantage of two important properties of PipelineDB:

PipelineDB is used as a high-throughput streaming analytics engine, not as a distributed database for storing granular data.

What this has shown us is that most datasets stored in the database are relatively small because they are continuously distilled from incoming data. As a result, PipelineDB Cluster focuses primarily on distributing computation and less on distributing data. The following property clarifies why this approach is effective.

All PipelineDB aggregates can be incrementally updated

Empirically nearly all continuous views are defined by some kind of aggregate query, whether it be simple or highly complex. Continuous aggregation requires that existing aggregates can be incrementally updated with new information. Taking this a step further, PipelineDB supports combining aggregates with other aggregates as if the final output came from all original inputs.

PipelineDB Cluster’s clustering engine takes advantage of this by distributing continuous views across multiple nodes that each get some subset of the incoming data. These “partial” continuous views are then combined as necessary at read-time with no loss of information, resulting in an architecture that requires almost no sharing of information between nodes.

Important

PipelineDB Cluster only distributes continuous views across nodes. All non-continuous view DML and DDL queries are simply replicated to all nodes in the cluster using Two Phase Commits.


Sharded Continuous Views

PipelineDB Cluster scales continuous computation across servers by sharding continuous views. Each shard is completely independent of all other shards. If necessary, shards are combined a final time at continuous view read-time, which is explained in the Combining Shards section.

Each node in a PipelineDB Cluster cluster can be responsible for at most one shard for each continuous view in the cluster. That is, a continuous view with n shards requires at least n nodes. However, there is no limit on how many different continuous views a given node can have shards for.

The number of shards to create for a continuous view can be specified in two different ways. Firstly, the shards storage parameter can be used:

CREATE CONTINUOUS VIEW v WITH (num_shards=8) AS
  SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

The second way to determine the number of shards to create is with the the shard_factor storage parameter. This parameter specifies the fraction of PipelineDB Cluster nodes to spread the continuous view over, as an integer in the range [1, 100]. For example, with a cluster of 16 nodes, the following definition would create a continuous view with 8 shards:

CREATE CONTINUOUS VIEW v WITH (shard_factor=50) AS
  SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

The shard factor may also be set globally in pipelinedb.conf using the pipeline_cluster.continuous_view_shard_factor configuration parameter.

Note

If no num_shards or shard_factor parameters are set in the continuous view definition, pipeline_cluster.shard_factor is used to determine the number of shards to create.

Shard Metadata

Shard placements are chosen to uniformly distributed shards across nodes, and their associated metadata is available in the pipeline_cluster.shards catalog view. Its schema is as follows:

\d pipeline_cluster.shards
  View "pipeline_cluster.shards"
     Column      |   Type   | Modifiers
-----------------+----------+-----------
 shard           | text     |
 owner           | text     |
 continuous_view | text     |
 shard_id        | smallint |
 shard_type      | text     |
 sharding_policy | text     |
 is_local        | boolean  |
 balanced        | boolean  |
 reachable       | boolean  |

Routing

Since any PipelineDB node can accept any read or write request, it is often necessary to route rows between nodes before combining them with on-disk data. However, any routing that is done does not happen until after a continuous view’s query plan is executed on any available incoming rows first. What this means is that only the aggregate result of incoming rows actually needs to be routed. This is designed to both minimize network overhead and distribute work.

Since continuous views are nearly always defined by aggregate queries with groupings, and since groupings are by definition unique, groupings are the basis for determining how rows are ultimately routed between nodes. PipelineDB Cluster supports two sharding policies: read_optimized and write_optimized, which will be explained in the following sections.

Sharding policies may be set globally using the pipeline_cluster.sharding_policy configuration parameter in pipelinedb.conf, or on a per-continuous view basis using the sharding_policy parameter:

CREATE CONTINUOUS VIEW v0 WITH (sharding_policy='read_optimized') AS
  SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

CREATE CONTINUOUS VIEW v1 WITH (sharding_policy='write_optimized') AS
  SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

read_optimized (default)

Read-optimized continuous views are designed to minimize the size of underlying on-disk data, thus making reads as fast as possible. Minimizing on-disk data is accomplished by only keeping disjoint subsets of continuous view groups on each shard. That is, after initial partial aggregation, resulting groups will be routed to the node that is assigned that group. Each group only exists on one node.

Consider the following simple example:

CREATE CONTINUOUS VIEW v WITH (sharding_policy='read_optimized', shards=2) AS
  SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

Now let’s assume that the following inserts are processed by a node other than the node that is assigned the resulting grouping:

INSERT INTO stream (x) VALUES (0), (0), (0), (0), (0);

Assuming that the worker process reads these five rows all at once fast enough, only the aggregate row (0, 5) would be routed to the grouping’s designated node, and subsequently combined with on-disk data as usual.

Note

Read-optimized grouping is PipelineDB Cluster’s default grouping policy

write_optimized

Write-optimized continuous views are designed to maximize write throughput by allowing any shard to maintain a local version of any group. Routing is then only necessary if a node that produces a partial aggregate result for a given group does not have any shards for the group’s continuous view.

Since a group may exist on multiple servers with write-optimized grouping, groups must be combined into a single group at read-time, thereby incurring a read-time performance impact in exchange for increased write throughput. How groups are combined at read-time is explained in the next section.


Combining Shards

Since continuous view shards are independent of each other, it may be necessary to combine data from multiple shards at continuous view read time in order to provide a finalized query result. This is a fundamental design property that takes advantage of the fact that continuous views are typically already aggregated, and therefore relatively cheap to read. Consider the following example continuous view:

CREATE CONTINUOUS VIEW v WITH (sharding_policy='write_optimized', shards=16) AS
  SELECT x::integer, AVG(y::integer) FROM stream GROUP BY x;

Since it uses a write_optimized grouping policy, multiple local copies for each grouped average may exist. At read time, these groups would be combined with no loss of information, producing a finalized result with exactly one row per group.