Repartition

The Repartition processor changes how pipeline data is partitioned. The processor redistributes data across partitions, increasing or decreasing the number of partitions as needed. The processor can randomly redistribute the data across the partitions or can redistribute the data by specified fields.

Spark runs a Transformer pipeline just as it runs any other application, splitting the data into partitions and performing operations on the partitions in parallel. Spark automatically handles the partitioning of data for you. However, at times you might need to control the size and number of partitions. When you need to change the partitioning, use the Repartition processor in the pipeline.

When you configure the Repartition processor, you select the repartition method to use and specify how to create partitions.

You can use multiple Repartition processors in a pipeline. However, as a best practice, design your pipeline to use as few Repartition processors as possible. The Repartition processor causes Spark to shuffle the data, redistributing the data so that it's grouped differently across the partitions, which can be an expensive operation.

Repartition Use Cases

You might use the Repartition processor to repartition pipeline data for the following use cases:
Redistribute data that has become skewed
Use the Repartition processor to redistribute data across partitions when data becomes skewed, or unevenly distributed across partitions. Data can become skewed after Spark performs operations such as joins or aggregations. Heavily skewed data can cause out of memory errors during the pipeline run.
When you redistribute skewed data, you can often configure the processor to use the existing number of partitions. You don't need to increase or decrease the number of partitions, you just need to evenly redistribute data across the existing partitions.
Increase the number of partitions when an out of memory error occurs
Use the Repartition processor to increase the number of partitions when the pipeline fails due to an out of memory error. For example, if a processor creates additional data, partitions might exceed the memory limit.
An out of memory error indicates the stage that encountered the error. To resolve the issue, add the Repartition processor before that stage to increase the number of partitions. When you define the number of partitions, consider the size and configuration of the cluster and the amount of data being processed.
Change the number of partitions that are written to file systems
Use the Repartition processor to change the number of partitions that are written to file systems.
When writing to a File destination, Spark creates one output file for each partition. You might need to repartition before a File destination to control the number of output files.
Partition by field to improve the performance of downstream analytic queries
Use the Repartition processor to partition the data by field, placing records with the same value for the specified field in the same partition. Repartitioning by field is particularly useful when a pipeline writes to a destination system that data scientists use to run analytic queries.

Repartition Methods

The Repartition processor provides several methods to repartition data.

The processor provides the following repartition methods:
  • Repartition by Number - Attempts to distribute data evenly across the specified number of partitions. Use to change the number of partitions or to redistribute data across the same number of partitions.
  • Coalesce by Number - Attempts to minimize the shuffling of data while reducing the number of partitions. Use only to reduce the number of partitions.
  • Repartition by Field Hash - Attempts to distribute data evenly across partitions while grouping data by the values of the specified fields. Use to group records with the same field values in the same partition. You specify the fields to repartition by.
  • Repartition by Field Range - Attempts to distribute data evenly across partitions by placing value ranges of the specified fields in the same partition. Use only when the values of specified fields are already ordered. You specify the fields to repartition by.

After you select the repartition method, you configure how to create the partitions. The processor can create partitions based on a specified number or on a specified maximum number of records allowed in the partitions.

The latter method requires performing a count of the records to determine how many partitions to create. This additional processing can adversely affect pipeline performance.

Repartition by Number

When repartitioning by number, the processor creates the specified number of partitions, and then attempts to redistribute the data evenly across the partitions, shuffling data as necessary.

You can increase or decrease the number of partitions when you repartition by number. You can also use the same number of partitions to redistribute the data to reduce skew. Note that when decreasing the number of partitions, the Coalesce method can be more efficient.

You specify how the partitions are created:
  • Number of Partitions - The processor creates the specified number of partitions and then randomly redistributes the data across the partitions.
  • Max Records per Partition - The processor performs a record count to determine how many partitions are needed and creates the partitions. Then, it redistributes the data across the partitions, honoring the maximum record requirement.

Coalesce by Number

When coalescing by number, the processor creates the specified number of partitions and then randomly redistributes the data in an attempt to balance the data across the partitions. Use only to decrease the number of partitions.

Instead of shuffling all of the data, coalescing rebalances data between partitions only when necessary. This can result in better pipeline performance.

You specify how the partitions are created:
  • Number of Partitions - The processor creates the specified number of partitions and then randomly redistributes the data to balance the data across the partitions.
  • Max Records per Partition - The processor performs a count to determine how many partitions are needed and creates the partitions. Then, it redistributes the data evenly across the partitions, honoring the maximum record requirement.

Repartition by Field Hash

When repartitioning by field hash, the processor creates partitions and redistributes the data as evenly as possible using the specified fields. The processor places records with the same value for the specified fields in the same partition, when possible.

Repartitioning by field hash is particularly useful when a pipeline writes to a destination system used by data scientists who run analytic queries on the data. To increase the analytics performance, partition the data by fields that the data scientists frequently include in the WHERE or GROUP BY clauses in their queries. For example, if the data scientists plan to run queries where the region field is a certain value, sending all records with the same region value to the same output file improves the efficiency of those queries.

Data is distributed based on the partition creation type that you select:
  • Number of Partitions - The processor creates the specified number of partitions, then groups data with the same values in the same partition. However, if you specify fewer partitions than discrete values, then partitions can contain multiple groups of values.

    For example, if you partition by a Month field, but only specify 10 partitions, then some partitions will contain data with multiple months.

  • Max Records per Partition - The processor performs a count to determine how many partitions are needed and creates the partitions. Then, it groups data with the same values in the same partition. When there are more records with the same values than the specified maximum, those records are distributed across multiple partitions, honoring the maximum record requirement.

Repartition by Field Range

The processor redistributes the data by the specified fields and redistributes the data as evenly as possible using the specified fields. The processor places records with the same range of values for the specified fields in the same partition, when possible. Use when the data in the specified fields is ordered.

Data is distributed based on the partition creation type that you select:
  • Number of Partitions - The processor creates the specified number of partitions and distributes records with field values within similar ranges in the same partition.
  • Max Records per Partition - The processor performs a count to determine how many partitions are needed and creates the partitions. Then, it writes records with field values in the appropriate ranges to the partitions.

Configuring a Repartition Processor

Configure a Repartition processor to change how pipeline data is partitioned.

  1. In the Properties panel, on the General tab, configure the following properties:
    General Property Description
    Name Stage name.
    Description Optional description.
    Cache Data Caches data processed for a batch so the data can be reused for multiple downstream stages. Use to improve performance when the stage passes data to multiple stages.

    Caching can limit pushdown optimization when the pipeline runs in ludicrous mode.

  2. On the Repartition tab, configure the following properties:
    Repartition Property Description
    Repartition Method How to repartition data:
    • Repartition by Number - Attempts to distribute data evenly across the specified number of partitions. Use to change the number of partitions or to redistribute data across the same number of partitions.
    • Coalesce by Number - Attempts to minimize the shuffling of data while reducing the number of partitions. Use only to reduce the number of partitions.
    • Repartition by Field Hash - Attempts to distribute data evenly across partitions while grouping data by the values of the specified fields. Use to group records with the same field values in the same partition. You specify the fields to repartition by.
    • Repartition by Field Range - Attempts to distribute data evenly across partitions by placing value ranges of the specified fields in the same partition. Use only when the values of specified fields are already ordered. You specify the fields to repartition by.
    Create Partitions By Method to create partitions:
    • Number of Partitions - Creates the specified number of partitions.
    • Max Records Per Partition - Creates partitions based on the specified maximum number of records to allow in a partition.

      When using this method, the processor performs a count of the records to determine how many partitions to create. This processing can adversely affect pipeline performance.

    Number of Partitions Number of partitions to create.
    Max Records per Partition Maximum number of records to allow in each partition.
    Fields to Partition By List of fields to partition by. Available when repartitioning by field hash or field range.

    Click the Add icon to specify another field to partition by.