Skip to content

Initiative: Parallel Deduplication

Angus edited this page Jan 5, 2025 · 19 revisions

After a bit of research into getting as much juice out of PostgreSQL as possible when deduplicating NSW Valuer General data and breaking it into more usable tables, it seems any kind of mass parallel writing/insertion will require some nontrivial changes to how I'm handling that specific part of my pipeline. At the moment, I'm just synchronously iterating over a bunch of SQL scripts executing one after the other.

These changes are doable, but they will take a while. An insertion query in PostgreSQL will be done sequence and you can't really do parallel writes in the same query. Because of that, much of the logic in may need to be rewritten and potentially coordinated to be executed in parallel from python or something. How to best do that should be given some thought.

However it may be worth doing the following first before lifting everything out of pure SQL.

Wider Parallelism Strategy

Mark tables used for staging data as UNLOGGED

I've already done this and temporary tables in postgres by default are unlogged, but basically ensure there's a UNLOGGED keyword here.

CREATE UNLOGGED TABLE IF NOT EXISTS nsw_vg_raw.land_value_row (
  ...
);

What does this do

It means they don't use Write-Ahead Log or WAL, for the most part that means they swap crash safety for throughput and speed. In our case that's an entirely acceptable trade off, as they aren't the destination they are just staging tables who act as a vehicle to populate the final data model. If they are crashing there's likely something wrong in the ingestion that needs fixing, and even if we crash with WAL the pipeline will stop either way. It just means we arrive at the destination we were heading faster anyways.

Mark Pure SQL Functions as Parallel safe

I have a few user defined functions like the one below. In order for postgresql to parallelise anything using these functions, it needs to know it's safe to do so. By default any user defined function in SQL will be deemed as PARALLEL UNSAFE. All you need to do is mark it as PARALLEL SAFE like so.

CREATE OR REPLACE FUNCTION pg_temp.sqm_area(
    area FLOAT,
    area_unit VARCHAR(1)
) RETURNS FLOAT AS $$
    SELECT CASE area_unit
        WHEN 'H' THEN area * 10000
        WHEN 'M' THEN area
        ELSE NULL
    END;
$$ LANGUAGE sql PARALLEL SAFE;

Obviously, only do this for actually parallel safe functions, most of my functions should be as they are just deterministic calculations like the one above.

Where possible avoid SERIAL or BIGSERIAL

The problem with SERIAL or BIGSERIAL is their creation has to be synchronised due to incrementing off the last known value. Even generating them like nextval('meta.source_source_id_seq') in a select statement is synchronous. UUID's can be made without any synchronisation, without any fear you accidentally have 2 of the same ID by accident

However I think it would be preferable to use any ids from the source data where possible, that way no computation is needed to create the ID. Otherwise use a UUID.

Examples

For Inserts

  • Old
    INSERT INTO nsw_vg_raw.land_value_row_source(land_value_row_id, source_id)
      SELECT land_value_row_id, nextval('meta.source_source_id_seq')
      FROM nsw_vg_raw.land_value_row a;
    
    INSERT INTO meta.source(source_id)
      SELECT source_id
      FROM nsw_vg_raw.land_value_row_source a;
  • New
    INSERT INTO nsw_vg_raw.land_value_row_source(land_value_row_id, source_id)
      SELECT land_value_row_id, UUID()
      FROM nsw_vg_raw.land_value_row a;
    
    INSERT INTO meta.source(source_id)
      SELECT source_id
      FROM nsw_vg_raw.land_value_row_source a;
    

Costs associated with UUIDs

They are bigger identifiers than bigints (16 bytes) and ints (8 bytes), they are 32 bytes. I do wonder if it means worse performance for indexes if it's harder to keep everything in memory when you have a lot of results. It think for the meta.source which is literally meta data, it should because the user won't be doing queries on this data often unless they want to inspect the source of their data — which generally shouldn't be an expensive operation.

But for all those reason, It would be good to try and find an id from the source data.

Staging Tables need to be partitioned

You can partition tables, it's a much to explain, but here's the docs for future reference. In short, when you're grouping data it'll have enough information generally to omit large swaths of irrelevant data that just isn't matched on a regular basis.

For the land value staging data, it would look kind of like this:

CREATE UNLOGGED TABLE IF NOT EXISTS nsw_vg_raw.land_value_row (
  ...
) PARTITION BY HASH (property_id);

CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 0);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 1);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 2);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 3);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 4);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 5);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 6);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
    FOR VALUES WITH (MODULUS 8, REMAINDER 7);

Nonparallelism Benefits to Partititioning

Seeing as a JOIN is exponentially complicated operation, in partitioning, if you have 2 tables partitioned similarly, it should reduce

Given you were able to partition data evenly across multiple partitions.

A Rows B Rows Rows Per Partition Sync Complexity workers Concurrent Complexity
10000 10000 No Partition 10000 ** 2 = 100,000,000 1 100,000,000
10000 10000 No Partition 10000 ** 2 = 100,000,000 10 100,000,000
10000 10000 1000 10*(1000**2) = 10,000,000 1 10,000,000
10000 10000 1000 10*(1000**2) = 10,000,000 10 1,000,000
10000 10000 100 100*(100**2) = 1,000,000 1 1,000,000
10000 10000 100 100*(100**2) = 1,000,000 10 100,000

The NSW VG data

Reading this may require some familiarity with the dataset.

  • Land Values
    • Tables should probably be partitioned by property_id, here's why:
      • Populating valuations:
        • Rows should be unique by property_id and effective_date
        • We will have duplicates, when effective date is derived from base_date_N, especially if we are fetching publications that are a year out from each other, we'll prioritise them on the basis of source_date.
        • When the data is partitioned by property_id, all possible duplicates will be together to be partitioned
      • Populating legal_description:
        • We'll deduplicate this, by ensuring we have the oldest versions of a consecutively recurring description for a specific property.
        • For that reason all any row with the same property_id should be in the same partition, as the descriptions are tied to the property
      • Populating property_area, same as above
      • Populating zone_obsevation, same as above
      • Populating strata_plan status, same as above
  • Property Sales
    • Tables should be probably be partitioned by a hash over both property_id & strata_lot_no. Hopefully NULL works as a partition friendly value, seeing as select (NULL = NULL) is neither true or false, but null (very unhelpful).
      • This allows us to do all of the above for properties.
        • Sales are unique on a dealing id basis, this should still work within the context of a table partitioned by property_id and strata_lot_no
      • In the event NULL values for start_lot_no do not make sense that's data partitioned by property_id should be fine.

If we can only partition by property ID, and not strata lot, it may mean some property sales will be over represented due to some being apartments, but this should be the cases randomly throughout the data so it's probably not that big of an issue, and it's very unlikely to be concentrated. It just may mean the data is arbitrarily distributed.

Use CTEs over Temporary tables

Once partitions have properly been setup, it should be simpler to do this than to run create temporary tables (I need to fact check this).

INSERT INTO nsw_vg.property_area(source_id, effective_date, property_id, property_area)
  SELECT source_id, effective_date, property_id, property_area 
    FROM nsw_vg_raw.land_value_row a

    -- once partitions have been setup these join's should be fast and parallel
    LEFT JOIN nsw_vg_raw.land_value_source b USING (property_id, source_date)
    LEFT JOIN nsw_vg_raw.land_value_effective_date c USING (property_id, source_date)
    WHERE property_area IS NOT NULL;

I have reason to believe the above will be lighter now. Once our tables are partitioned and parallel friendly, it'll be less efficient to synchronously write to temporary tables (even if you redo the join multiple times), which will be an atomic operation, and all insertions will need to complete before starting the next operation.

It's possible temporary table may work be faster if it's partitioned but you need to repeat your partitioning for this temporary table.

Use Copy command.

Tbh this only makes sense for stuff on disc

COPY nsw_lrs.legal_description (source_id, effective_date, property_id, legal_description, legal_description_kind)
FROM '/tmp/data.csv' CSV;

Disable Triggers while we are populating data

Something like this:

ALTER TABLE target_table DISABLE TRIGGER ALL;
-- Perform inserts
ALTER TABLE target_table ENABLE TRIGGER ALL;

I think setting the replication role to replica does the same thing, but also foreign keys.

SET session_replication_role = 'replica';
-- ...
SET session_replication_role = 'origin';

Batching

I think you could get a similar benefit if you were to partition the data, perhaps combining both this and paritioning data will have a similar result. I should look further into to this to see if this is really the case, but apparently batching reduces query complexity that spreading the same computation over multiple batches lowers complexity significantly. I can see this working with partitioned tables.

Via Offset

DO $$
DECLARE
    batch_size INT := 10000;
    offset INT := 0;
BEGIN
    LOOP
        WITH cte AS (
            SELECT * FROM large_table LIMIT batch_size OFFSET offset
        )
        UPDATE large_table
        SET column = column + 1
        WHERE id IN (SELECT id FROM cte);

        EXIT WHEN NOT FOUND;
        offset := offset + batch_size;
    END LOOP;
END $$;

Via Partition

DO $$
DECLARE
    partition INT := 0;
    total INT := 8;
BEGIN
  LOOP
    IF partition >= total THEN EXIT; END IF; 
    
    WITH subset AS (
        SELECT property_id, source_id, a.zone_code, b.effective_date
          FROM nsw_vg_raw.land_value_row a
          LEFT JOIN nsw_vg_raw.land_value_source b USING (property_id, source_date)
          LEFT JOIN nsw_vg_raw.land_value_effective_date c USING (property_id, source_date)
          WHERE property_id % total = partition),
      lagged AS (
        SELECT property_id, source_id, zone_code, effective_date,
               LAG(zone_code) OVER (ORDER BY effective_date) AS prev_zone
          FROM subset
          WHERE zone_code != prev_zone
    INSERT INTO nsw_lrs.zone_observation(property_id, source_id, effectve_date, zone_code, effective_date)
    SELECT property_id, source_id, zone_code, effective_date FROM lagged;  
  END LOOP;
END $$;

Maybe Look at Vacuum'ing?

Vacuuming in Postgres is just cleaning up deleted data, in my case that isn't really happening a tone, but I do wonder if it's something I need to do if I've created and deleted a few temporary tables...

Correctly configure the database

For the most part this is already done.

General parallelism

  1. Setup parallel workers.
    max_parallel_workers = 32
    max_parallel_workers_per_gather = 8
    parallel_setup_cost = 0.1
    parallel_tuple_cost = 0.1
    
    I just did 32 for my number of cores, and 8 because I saw someone suggest 1/4 (not sure why).
  2. I saw somewhere suggest setting shared_buffer to 1/4 of total memory
    shared_buffers = 32GB -- if you got 128GB of RAM
    
  3. Apparently you should scale this relative to your connections?? Idk how I arrived at this number, maybe I should redo this.
    work_mem = 64MB  
    
  4. This if for maintenance (such as updating indexes and vacuum — which is removing deleted items).
    maintenance_work_mem = 1GB
    
  5. I don't entirely know how this works... But bigger the number the better? (I should really look into this).
    temp_buffers = 64MB
    
  6. I'm told make this 50-75% your system memory. I Have no idea how cache works tbh, but I assume this is an upper limit in the event it's needed. I have it set to something obscene and my systems memory pressure is idle while it's doing nothing.
    effective_cache_size = 64GB
    

WAL Config

What is WAL config?

  1. Look I don't really get the specifics of WAL, I mean I occasionally look it up, but immediately forget it. Probably isn't important.
    wal_buffers = 16MB
    
  2. Max WAL Size: Prevent frequent checkpoints. What's a checkpoint, I've also forgetten this since I looked it up, probably also not important.
    max_wal_size = 1GB
    min_wal_size = 256MB
    
  3. Checkpoint Tuning: Spread checkpoint writes. Again Idk, it was recommended and I set it to this and nothing bad happened
    checkpoint_timeout = 10min
    checkpoint_completion_target = 0.9
    

I saw a recommendation to disable synchronous commit. Not sure I'm that confident, but maybe it's worth considering once I reduce a need or synchronisation after partitioning and moving to IDs that don't need synchronising. Maybe it's worth doing in a bunch of ingestions that aren't dependent on one another.

synchronous_commit = off

For python code writing to the database, also worth adjusting the max connections, the default is 100.

max_connections = 200

Query Tuning

  1. I couldn't tell you what this is for but it was from the same list.
    join_collapse_limit = 16
    from_collapse_limit = 16
    
  2. Partition wise joins
    enable_partitionwise_join = on;
    enable_partitionwise_aggregate = on;
    
  3. This just defers some computation that is typically front loaded when your query is parsed
    jit = on
    

AutoVacuum Config

I need to read more into this later, but this is just a background operation for Vacuuming. Which just updates indexes and removes refragments tables removing dead data. Read more here.

  1. Increase Autovacuum Workers: Handle larger tables in parallel:
    autovacuum_max_workers = 6
    
  2. Aggressive Vacuuming: Ensure tables are vacuumed frequently:
    autovacuum_vacuum_cost_limit = 2000
    autovacuum_vacuum_scale_factor = 0.05
    autovacuum_analyze_scale_factor = 0.02
    
  3. Analyze Dead Tuples Faster
    autovacuum_vacuum_threshold = 50
    autovacuum_analyze_threshold = 50
    

Some other things I should look into

  1. Backend Flush: Optimize write-back performance:
    backend_flush_after = 512kB
    
  2. Default Statistics Target: Improve query planning:
    default_statistics_target = 200
    
  3. Log Temp Files: Track inefficient queries:
    log_temp_files = 128MB
    

Condition to move from Pure SQL

If all of the above is still slow, only then port the queries out of SQL and into python, and parallelise it via multiprocessing and process partitioning. Because that'll be a lot of work, and makes the queries harder for others to work with.

The Plan

1. Remove UUID from meta schema

This should be simple, but we'll want to add the UUID extension to postgres like so (this just adds functions to generate UUIDs, there should be a type to support these IDs regardless).

CREATE EXTENSION IF NOT EXISTS "uuid-ossp"

2. Replace Temporary Tables with Complementary Staging tables

At the moment during the ingestion we are creating temporary tables with complementary table for the ingestion. While these can probably be partitioned, if they have a 1:1 relationship with existing data, they may as well be part of the preexisting source tables that exist to track a source_id for each row of staging data.

For land values this will look like this.

CREATE UNLOGGED TABLE IF NOT EXISTS nsw_vg_raw.land_value_row_complement(
    property_id INT NOT NULL,
    source_date DATE NOT NULL,
    effective_date DATE NOT NULL,
    source_id UUID UNIQUE NOT NULL,
    FOREIGN KEY (source_id) REFERENCES meta.source(source_id)
);

For PSI it is yet to be determined, but it should have effective_date, and it's worth considering where the consolidated description should be stored.

3. Create source meta data at ingestion

This work is complicated and can be done posthumously after earlier and later steps.

Seeing as the ingestion of the land values is already done in a multiprocessing batch, we can probably also create the source meta data here as well. The benefit will be creating a UUID on a source file basis and creating the source, file_source, and source_file_line simultaneously in an already parallel process.

Rational: we can drop temporary tables like pg_temp.lv_unigested_files this way.

Clone this wiki locally