diff --git a/expected/time_interval.out b/expected/time_interval.out index 753dd97..d5ab332 100644 --- a/expected/time_interval.out +++ b/expected/time_interval.out @@ -1,6 +1,7 @@ create extension pg_incremental cascade; create schema time_range; set search_path to time_range; +set client_min_messages to warning; -- create a source table create table events ( event_id bigint generated always as identity, @@ -29,7 +30,6 @@ select incremental.create_time_interval_pipeline('event-aggregation', '1 day', where event_time >= $1 and event_time < $2 group by 1 $$); -NOTICE: pipeline event-aggregation: processing time range from Fri Dec 31 16:00:00 1999 PST to Fri Dec 06 00:00:00 2024 PST create_time_interval_pipeline ------------------------------- @@ -50,7 +50,6 @@ select sum(event_count) from events_agg; insert into events (client_id, path, response_time) select s, '/page-' || (s % 3), random() from generate_series(1,100) s; call incremental.execute_pipeline('event-aggregation'); -NOTICE: pipeline event-aggregation: no rows to process select count(*) from events; count ------- @@ -65,10 +64,4 @@ select sum(event_count) from events_agg; (1 row) drop schema time_range cascade; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to table events -drop cascades to table events_agg drop extension pg_incremental; -NOTICE: drop cascades to 2 other objects -DETAIL: drop cascades to function incremental._drop_extension_trigger() -drop cascades to event trigger incremental_drop_extension_trigger diff --git a/pg_incremental--1.0.sql b/pg_incremental--1.0.sql index 1c9802e..65e94ec 100644 --- a/pg_incremental--1.0.sql +++ b/pg_incremental--1.0.sql @@ -52,7 +52,7 @@ GRANT SELECT ON incremental.processed_files TO public; CREATE FUNCTION incremental.create_sequence_pipeline( pipeline_name text, - sequence_name regclass, + source_table_name regclass, command text, schedule text default '* * * * *', execute_immediately bool default true) diff --git a/sql/time_interval.sql b/sql/time_interval.sql index e888a76..3e6cc55 100644 --- a/sql/time_interval.sql +++ b/sql/time_interval.sql @@ -1,6 +1,7 @@ create extension pg_incremental cascade; create schema time_range; set search_path to time_range; +set client_min_messages to warning; -- create a source table create table events ( diff --git a/src/pipeline.c b/src/pipeline.c index 20186e0..6707fd5 100644 --- a/src/pipeline.c +++ b/src/pipeline.c @@ -50,7 +50,7 @@ incremental_create_sequence_pipeline(PG_FUNCTION_ARGS) if (PG_ARGISNULL(0)) ereport(ERROR, (errmsg("pipeline_name cannot be NULL"))); if (PG_ARGISNULL(1)) - ereport(ERROR, (errmsg("sequence_name cannot be NULL"))); + ereport(ERROR, (errmsg("source_table_name cannot be NULL"))); if (PG_ARGISNULL(2)) ereport(ERROR, (errmsg("command cannot be NULL"))); @@ -64,6 +64,10 @@ incremental_create_sequence_pipeline(PG_FUNCTION_ARGS) switch (get_rel_relkind(sequenceId)) { + /* + * We allow source_table_name to be a sequence, and this is + * necessary if there are multiple sequences. + */ case RELKIND_SEQUENCE: { int32 columnNumber = 0; @@ -71,7 +75,7 @@ incremental_create_sequence_pipeline(PG_FUNCTION_ARGS) if (!sequenceIsOwned(sequenceId, DEPENDENCY_AUTO, &sourceRelationId, &columnNumber)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("enly sequences that are owned by a table are supported"))); + errmsg("only sequences that are owned by a table are supported"))); } break; diff --git a/src/sequence.c b/src/sequence.c index e9704d9..8b39435 100644 --- a/src/sequence.c +++ b/src/sequence.c @@ -319,17 +319,15 @@ FindSequenceForRelation(Oid relationId) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("relation \"%s\" does not have any sequences associated " "with it", - get_rel_name(relationId)), - errhint("Specify the name of the sequence to use for the " - "pipeline as the argument"))); + get_rel_name(relationId)))); if (list_length(sequences) > 1) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("relation \"%s\" has multiple sequences associated " "with it", get_rel_name(relationId)), - errhint("Specify the name of the sequence to use for the " - "pipeline as the argument"))); + errhint("Specify the name of the sequence to use instead of " + "the table name"))); return linitial_oid(sequences); }