High Availability

So far we’ve only talked about primary shards which can be set using the shard_factor or num_shards storage parameter. But what if one of the nodes assigned a shard were to go down? We provide per-shard replication to provide high availability for your data in case of node failure/downtime.

The number of replicas per shard can be set per continuous view using the num_replicas storage parameter.

CREATE VIEW v WITH (num_shards=3, num_replicas=2) AS
  SELECT x::integer, COUNT(*) FROM stream GROUP BY x;

This will create a continuous view with 3 primary shards and 2 replicas for each primary shard. In the context of high availability, this means that each shards has a total of 3 independent copies in the cluster and so the cluster can tolerate the failure of up to 2 nodes.


If no num_replicas parameter is provided in the continuous view definition, pipeline_cluster.num_replicas is used to determine the number of replica shards to create.

PipelineDB Cluster will never co-locate a replica shard on the same node as the primary shard since it does not help with high availability. So num_replicas=N requires the cluster to have at least N+1 nodes.

How It Works

PipelineDB Cluster uses logical decoding to synchronize changes made to primary shards with their replicas. Each node in the cluster keeps a logical replication slot for each other node in the cluster and tails the WAL separately for each node. Whenever a node encounters a change made to a primary shard that is replicated on the node it is currently streaming the WAL for, it sends the changes down to the node. This replication is asynchronous so in case of node failure, some of the most recent changes might not be available on replica shards. However, replica shards acknowledge all changes made per transaction so we are guaranteed that replica shards never miss any changes and will be eventually consistent with primary shards.

Balancing Read Load

Read performance is affected by the read latency of the slowest node which contains a shard for the continuous view we’re reading. So to improve read performance we should try to balance the load across all nodes in the cluster. One way to do this is by enabling reads from replica shards. This behavior can be toggled by using the pipeline_cluster.primary_only configuration parameter.

However, since replication in PipelineDB Cluster is asynchronous, reading from replicas might mean that we end up reading stale data in case the replica hasn’t caught up with the primary shard.


By default pipeline_cluster.primary_only is set to False so reads will automatically be distributed to replica shards as well.

Handling Node Failures


When a node is down, read requests any shards located on it are re-routed to one of the shard’s replicas. pipeline_cluster.primary_only must be set to False for this to work.

In case all nodes for a specific shard ID are down, there is no way for the cluster to access data in that shard and so read requests will fail. You can set the pipeline_cluster.skip_unavailable configuration parameter to change this behavior and return only data for shards that are reachable.


At least N replicas per shard are needed to handle simultaneous failure of up to N nodes.


PipelineDB Cluster shards continuous views by partitioning groups onto a hash ring and making each primary shard responsible for a contiguous range in the ring. When a primary shard is unavailable, the partial aggregate result is re-routed to the next available primary shard in the hash ring. If all primary shards are unavailable, the partial aggregate is discarded. Since we always combine the data from all shards during read time, this re-routing means that no partial aggregate result is ever lost and we will always get accurate results.

When the sharding policy is write_optimized, this re-routing doesn’t affect the balancing of data since all primary shards can legally contain data for all groups. However, for read_optimized continuous views this re-routing can result in two different primary shards containing partial aggregates for the same group which violates the policy’s requirement (albeit not affecting correctness). Currently, PipelineDB Cluster doesn’t do anything about it so during node down times, read_optimized sharding policy will end up degrading. In future releases, we will automatically rebalance data for read_optimized continuous views so that each node only contains the groups it is responsible for.


The balanced column in pipeline_cluster.shards indicates whether a data is balanced properly for that shard or not. Trivially, all write_optimized shards are always balanced.