@@ -862,7 +862,14 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
862862 PostgresDataType pkType = getPrimaryKeyType (tableName , pkColumn );
863863 parsed .add (quotedPkColumn , key .toString (), pkType , false );
864864
865- String sql = buildUpsertSql (parsed .getColumns (), quotedPkColumn );
865+ List <String > docColumns = parsed .getColumns ();
866+ List <String > allColumns =
867+ schemaRegistry .getSchema (tableName ).values ().stream ()
868+ .map (PostgresColumnMetadata ::getName )
869+ .map (PostgresUtils ::wrapFieldNamesWithDoubleQuotes )
870+ .collect (Collectors .toList ());
871+
872+ String sql = buildCreateOrReplaceSql (allColumns , docColumns , quotedPkColumn );
866873 LOGGER .debug ("Upsert SQL: {}" , sql );
867874
868875 return executeUpsert (sql , parsed );
@@ -882,15 +889,16 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
882889 *
883890 * <ul>
884891 * <li>Inserts a new row if no conflict on the primary key
885- * <li>Updates all non-PK columns if a row with the same PK already exists
892+ * <li>If the row with that PK already exists, it is replaced in entirety. Cols not present in
893+ * the latest upsert are set to their default values (as defined in the schema)
886894 * </ul>
887895 *
888896 * <p><b>Generated SQL pattern:</b>
889897 *
890898 * <pre>{@code
891- * INSERT INTO table (col1, col2, pk_col)
899+ * INSERT INTO table (col1, col2,, col3, pk_col)
892900 * VALUES (?, ?, ?)
893- * ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2
901+ * ON CONFLICT (pk_col) DO UPDATE SET col1 = EXCLUDED.col1, col2 = EXCLUDED.col2, col3 = DEFAULT
894902 * RETURNING (xmax = 0) AS is_insert
895903 * }</pre>
896904 *
@@ -909,19 +917,30 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR
909917 * <li>Thus, {@code is_insert = true} means INSERT, {@code is_insert = false} means UPDATE
910918 * </ul>
911919 *
912- * @param columns List of quoted column names to include in the upsert (including PK)
920+ * @param allTableColumns all cols present in the table
921+ * @param docColumns cols present in the document
913922 * @param pkColumn The quoted primary key column name used for conflict detection
914923 * @return The complete upsert SQL statement with placeholders for values
915924 */
916- private String buildUpsertSql (List <String > columns , String pkColumn ) {
917- String columnList = String .join (", " , columns );
918- String placeholders = String .join (", " , columns .stream ().map (c -> "?" ).toArray (String []::new ));
919-
920- // Build SET clause for non-PK columns: col = EXCLUDED.col
925+ private String buildCreateOrReplaceSql (
926+ List <String > allTableColumns , List <String > docColumns , String pkColumn ) {
927+ String columnList = String .join (", " , docColumns );
928+ String placeholders =
929+ String .join (", " , docColumns .stream ().map (c -> "?" ).toArray (String []::new ));
930+ Set <String > docColumnsSet = new HashSet <>(docColumns );
931+
932+ // Build SET clause for non-PK columns.
921933 String setClause =
922- columns .stream ()
934+ allTableColumns .stream ()
923935 .filter (col -> !col .equals (pkColumn ))
924- .map (col -> col + " = EXCLUDED." + col )
936+ .map (
937+ col -> {
938+ if (docColumnsSet .contains (col )) {
939+ return col + " = EXCLUDED." + col ;
940+ } else {
941+ return col + " = DEFAULT" ;
942+ }
943+ })
925944 .collect (Collectors .joining (", " ));
926945
927946 return String .format (
0 commit comments