-
Notifications
You must be signed in to change notification settings - Fork 0
Initiative: Parallel Deduplication
After a bit of research into getting as much juice out of PostgreSQL as possible when deduplicating NSW Valuer General data and breaking it into more usable tables, it seems any kind of mass parallel writing/insertion will require some nontrivial changes to how I'm handling that specific part of my pipeline. At the moment, I'm just synchronously iterating over a bunch of SQL scripts executing one after the other.
These changes are doable, but they will take a while. An insertion query in PostgreSQL will be done sequence and you can't really do parallel writes in the same query. Because of that, much of the logic in may need to be rewritten and potentially coordinated to be executed in parallel from python or something. How to best do that should be given some thought.
However it may be worth doing the following first before lifting everything out of pure SQL.
I've already done this and temporary tables in postgres by default are unlogged, but basically ensure there's a UNLOGGED
keyword here.
CREATE UNLOGGED TABLE IF NOT EXISTS nsw_vg_raw.land_value_row (
...
);
It means they don't use Write-Ahead Log
or WAL
, for the most part that means they swap crash safety for throughput and speed. In our case that's an entirely acceptable trade off, as they aren't the destination they are just staging tables who act as a vehicle to populate the final data model. If they are crashing there's likely something wrong in the ingestion that needs fixing, and even if we crash with WAL the pipeline will stop either way. It just means we arrive at the destination we were heading faster anyways.
I have a few user defined functions like the one below. In order for postgresql to parallelise anything using these functions, it needs to know it's safe to do so. By default any user defined function in SQL will be deemed as PARALLEL UNSAFE
. All you need to do is mark it as PARALLEL SAFE
like so.
CREATE OR REPLACE FUNCTION pg_temp.sqm_area(
area FLOAT,
area_unit VARCHAR(1)
) RETURNS FLOAT AS $$
SELECT CASE area_unit
WHEN 'H' THEN area * 10000
WHEN 'M' THEN area
ELSE NULL
END;
$$ LANGUAGE sql PARALLEL SAFE;
Obviously, only do this for actually parallel safe functions, most of my functions should be as they are just deterministic calculations like the one above.
The problem with SERIAL
or BIGSERIAL
is their creation has to be synchronised due to incrementing off the last known value. Even generating them like nextval('meta.source_source_id_seq')
in a select statement is synchronous. UUID's can be made without any synchronisation, without any fear you accidentally have 2 of the same ID by accident
However I think it would be preferable to use any ids from the source data where possible, that way no computation is needed to create the ID. Otherwise use a UUID
.
- In
meta.source
we should probably use a UUID. - In
nsw_vg_raw.land_value_row
we can probably use theproperty_id
andsource_date
- Same goes for
nsw_vg_raw.land_value_row_source
- Same goes for
- Old
INSERT INTO nsw_vg_raw.land_value_row_source(land_value_row_id, source_id) SELECT land_value_row_id, nextval('meta.source_source_id_seq') FROM nsw_vg_raw.land_value_row a; INSERT INTO meta.source(source_id) SELECT source_id FROM nsw_vg_raw.land_value_row_source a;
- New
INSERT INTO nsw_vg_raw.land_value_row_source(land_value_row_id, source_id) SELECT land_value_row_id, UUID() FROM nsw_vg_raw.land_value_row a; INSERT INTO meta.source(source_id) SELECT source_id FROM nsw_vg_raw.land_value_row_source a;
They are bigger identifiers than bigints (16 bytes) and ints (8 bytes), they are 32 bytes. I do wonder if it means worse performance for indexes if it's harder to keep everything in memory when you have a lot of results. It think for the meta.source
which is literally meta data, it should because the user won't be doing queries on this data often unless they want to inspect the source of their data — which generally shouldn't be an expensive operation.
But for all those reason, It would be good to try and find an id from the source data.
You can partition tables, it's a much to explain, but here's the docs for future reference. In short, when you're grouping data it'll have enough information generally to omit large swaths of irrelevant data that just isn't matched on a regular basis.
For the land value staging data, it would look kind of like this:
CREATE UNLOGGED TABLE IF NOT EXISTS nsw_vg_raw.land_value_row (
...
) PARTITION BY HASH (property_id);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 0);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 1);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 2);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 3);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 4);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 5);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 6);
CREATE TABLE land_value_row_p1 PARTITION OF nsw_vg_raw.land_value_row
FOR VALUES WITH (MODULUS 8, REMAINDER 7);
Seeing as a JOIN is exponentially complicated operation, in partitioning, if you have 2 tables partitioned similarly, it should reduce
Given you were able to partition data evenly across multiple partitions.
A Rows | B Rows | Rows Per Partition | Sync Complexity | workers | Concurrent Complexity |
---|---|---|---|---|---|
10000 | 10000 | No Partition | 10000 ** 2 = 100,000,000 |
1 | 100,000,000 |
10000 | 10000 | No Partition | 10000 ** 2 = 100,000,000 |
10 | 100,000,000 |
10000 | 10000 | 1000 | 10*(1000**2) = 10,000,000 |
1 | 10,000,000 |
10000 | 10000 | 1000 | 10*(1000**2) = 10,000,000 |
10 | 1,000,000 |
10000 | 10000 | 100 | 100*(100**2) = 1,000,000 |
1 | 1,000,000 |
10000 | 10000 | 100 | 100*(100**2) = 1,000,000 |
10 | 100,000 |
Reading this may require some familiarity with the dataset.
-
Land Values
- Tables should probably be partitioned by
property_id
, here's why:-
Populating valuations:
- Rows should be unique by
property_id
andeffective_date
- We will have duplicates, when effective date is derived from
base_date_N
, especially if we are fetching publications that are a year out from each other, we'll prioritise them on the basis ofsource_date
. - When the data is partitioned by property_id, all possible duplicates will be together to be partitioned
- Rows should be unique by
-
Populating
legal_description
:- We'll deduplicate this, by ensuring we have the oldest versions of a consecutively recurring description for a specific property.
- For that reason all any row with the same property_id should be in the same partition, as the descriptions are tied to the property
-
Populating
property_area
, same as above -
Populating
zone_obsevation
, same as above -
Populating
strata_plan
status, same as above
-
Populating valuations:
- Tables should probably be partitioned by
-
Property Sales
- Tables should be probably be partitioned by a hash over both
property_id
&strata_lot_no
. Hopefully NULL works as a partition friendly value, seeing asselect (NULL = NULL)
is neither true or false, but null (very unhelpful).- This allows us to do all of the above for properties.
- Sales are unique on a dealing id basis, this should still work within the context of a table partitioned by
property_id
andstrata_lot_no
- Sales are unique on a dealing id basis, this should still work within the context of a table partitioned by
- In the event
NULL
values forstart_lot_no
do not make sense that's data partitioned byproperty_id
should be fine.
- This allows us to do all of the above for properties.
- Tables should be probably be partitioned by a hash over both
If we can only partition by property ID, and not strata lot, it may mean some property sales will be over represented due to some being apartments, but this should be the cases randomly throughout the data so it's probably not that big of an issue, and it's very unlikely to be concentrated. It just may mean the data is arbitrarily distributed.
Once partitions have properly been setup, it should be simpler to do this than to run create temporary tables (I need to fact check this).
INSERT INTO nsw_vg.property_area(source_id, effective_date, property_id, property_area)
SELECT source_id, effective_date, property_id, property_area
FROM nsw_vg_raw.land_value_row a
-- once partitions have been setup these join's should be fast and parallel
LEFT JOIN nsw_vg_raw.land_value_source b USING (property_id, source_date)
LEFT JOIN nsw_vg_raw.land_value_effective_date c USING (property_id, source_date)
WHERE property_area IS NOT NULL;
I have reason to believe the above will be lighter now. Once our tables are partitioned and parallel friendly, it'll be less efficient to synchronously write to temporary tables (even if you redo the join multiple times), which will be an atomic operation, and all insertions will need to complete before starting the next operation.
It's possible temporary table may work be faster if it's partitioned but you need to repeat your partitioning for this temporary table.
Tbh this only makes sense for stuff on disc
COPY nsw_lrs.legal_description (source_id, effective_date, property_id, legal_description, legal_description_kind)
FROM '/tmp/data.csv' CSV;
Something like this:
ALTER TABLE target_table DISABLE TRIGGER ALL;
-- Perform inserts
ALTER TABLE target_table ENABLE TRIGGER ALL;
I think setting the replication role to replica does the same thing, but also foreign keys.
SET session_replication_role = 'replica';
-- ...
SET session_replication_role = 'origin';
I think you could get a similar benefit if you were to partition the data, perhaps combining both this and paritioning data will have a similar result. I should look further into to this to see if this is really the case, but apparently batching reduces query complexity that spreading the same computation over multiple batches lowers complexity significantly. I can see this working with partitioned tables.
DO $$
DECLARE
batch_size INT := 10000;
offset INT := 0;
BEGIN
LOOP
WITH cte AS (
SELECT * FROM large_table LIMIT batch_size OFFSET offset
)
UPDATE large_table
SET column = column + 1
WHERE id IN (SELECT id FROM cte);
EXIT WHEN NOT FOUND;
offset := offset + batch_size;
END LOOP;
END $$;
DO $$
DECLARE
partition INT := 0;
total INT := 8;
BEGIN
LOOP
IF partition >= total THEN EXIT; END IF;
WITH subset AS (
SELECT property_id, source_id, a.zone_code, b.effective_date
FROM nsw_vg_raw.land_value_row a
LEFT JOIN nsw_vg_raw.land_value_source b USING (property_id, source_date)
LEFT JOIN nsw_vg_raw.land_value_effective_date c USING (property_id, source_date)
WHERE property_id % total = partition),
lagged AS (
SELECT property_id, source_id, zone_code, effective_date,
LAG(zone_code) OVER (ORDER BY effective_date) AS prev_zone
FROM subset
WHERE zone_code != prev_zone
INSERT INTO nsw_lrs.zone_observation(property_id, source_id, effectve_date, zone_code, effective_date)
SELECT property_id, source_id, zone_code, effective_date FROM lagged;
END LOOP;
END $$;
Vacuuming in Postgres is just cleaning up deleted data, in my case that isn't really happening a tone, but I do wonder if it's something I need to do if I've created and deleted a few temporary tables...
For the most part this is already done.
- Setup parallel workers.
I just did 32 for my number of cores, and 8 because I saw someone suggest 1/4 (not sure why).
max_parallel_workers = 32 max_parallel_workers_per_gather = 8 parallel_setup_cost = 0.1 parallel_tuple_cost = 0.1
- I saw somewhere suggest setting shared_buffer to 1/4 of total memory
shared_buffers = 32GB -- if you got 128GB of RAM
- Apparently you should scale this relative to your connections?? Idk how I arrived at this number, maybe I should redo this.
work_mem = 64MB
- This if for maintenance (such as updating indexes and vacuum — which is removing deleted items).
maintenance_work_mem = 1GB
- I don't entirely know how this works... But bigger the number the better? (I should really look into this).
temp_buffers = 64MB
- I'm told make this 50-75% your system memory. I Have no idea how cache works tbh, but I assume this is an upper limit in the event it's needed. I have it set to something obscene and my systems memory pressure is idle while it's doing nothing.
effective_cache_size = 64GB
What is WAL config?
- Look I don't really get the specifics of WAL, I mean I occasionally look it up, but immediately forget it. Probably isn't important.
wal_buffers = 16MB
- Max WAL Size: Prevent frequent checkpoints. What's a checkpoint, I've also forgetten this since I looked it up, probably also not important.
max_wal_size = 1GB min_wal_size = 256MB
- Checkpoint Tuning: Spread checkpoint writes. Again Idk, it was recommended and I set it to this and nothing bad happened
checkpoint_timeout = 10min checkpoint_completion_target = 0.9
I saw a recommendation to disable synchronous commit. Not sure I'm that confident, but maybe it's worth considering once I reduce a need or synchronisation after partitioning and moving to IDs that don't need synchronising. Maybe it's worth doing in a bunch of ingestions that aren't dependent on one another.
synchronous_commit = off
For python code writing to the database, also worth adjusting the max connections, the default is 100.
max_connections = 200
- I couldn't tell you what this is for but it was from the same list.
join_collapse_limit = 16 from_collapse_limit = 16
- Partition wise joins
enable_partitionwise_join = on; enable_partitionwise_aggregate = on;
- This just defers some computation that is typically front loaded when your query is parsed
jit = on
I need to read more into this later, but this is just a background operation for Vacuuming. Which just updates indexes and removes refragments tables removing dead data. Read more here.
- Increase Autovacuum Workers: Handle larger tables in parallel:
autovacuum_max_workers = 6
- Aggressive Vacuuming: Ensure tables are vacuumed frequently:
autovacuum_vacuum_cost_limit = 2000 autovacuum_vacuum_scale_factor = 0.05 autovacuum_analyze_scale_factor = 0.02
- Analyze Dead Tuples Faster
autovacuum_vacuum_threshold = 50 autovacuum_analyze_threshold = 50
- Backend Flush: Optimize write-back performance:
backend_flush_after = 512kB
- Default Statistics Target: Improve query planning:
default_statistics_target = 200
- Log Temp Files: Track inefficient queries:
log_temp_files = 128MB
If all of the above is still slow, only then port the queries out of SQL and into python, and parallelise it via multiprocessing and process partitioning. Because that'll be a lot of work, and makes the queries harder for others to work with.
This should be simple, but we'll want to add the UUID extension to postgres like so (this just adds functions to generate UUIDs, there should be a type to support these IDs regardless).
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"
At the moment during the ingestion we are creating temporary tables with complementary table for the ingestion. While these can probably be partitioned, if they have a 1:1 relationship with existing data, they may as well be part of the preexisting source tables that exist to track a source_id
for each row of staging data.
For land values this will look like this.
CREATE UNLOGGED TABLE IF NOT EXISTS nsw_vg_raw.land_value_row_complement(
property_id INT NOT NULL,
source_date DATE NOT NULL,
effective_date DATE NOT NULL,
source_id UUID UNIQUE NOT NULL,
FOREIGN KEY (source_id) REFERENCES meta.source(source_id)
);
For PSI it is yet to be determined, but it should have effective_date
, and it's worth considering where the consolidated description should be stored.
This work is complicated and can be done posthumously after earlier and later steps.
Seeing as the ingestion of the land values is already done in a multiprocessing batch, we can probably also create the source meta data here as well. The benefit will be creating a UUID on a source file basis and creating the source
, file_source
, and source_file_line
simultaneously in an already parallel process.
Rational: we can drop temporary tables like
pg_temp.lv_unigested_files
this way.