Skip to content

Commit 79f725e

Browse files
committed
Add new kafk acolumns from mz_kafka_source_tables
1 parent ae709ec commit 79f725e

File tree

2 files changed

+43
-35
lines changed

2 files changed

+43
-35
lines changed

pkg/materialize/source_table_kafka.go

Lines changed: 41 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,44 +9,50 @@ import (
99

1010
type SourceTableKafkaParams struct {
1111
SourceTableParams
12+
EnvelopeType string `db:"envelope_type"`
13+
KeyFormat string `db:"key_format"`
14+
ValueFormat string `db:"value_format"`
1215
}
1316

1417
var sourceTableKafkaQuery = `
15-
SELECT
16-
mz_tables.id,
17-
mz_tables.name,
18-
mz_schemas.name AS schema_name,
19-
mz_databases.name AS database_name,
20-
mz_sources.name AS source_name,
21-
source_schemas.name AS source_schema_name,
22-
source_databases.name AS source_database_name,
23-
mz_kafka_source_tables.topic AS upstream_table_name,
24-
mz_sources.type AS source_type,
25-
comments.comment AS comment,
26-
mz_roles.name AS owner_name,
27-
mz_tables.privileges
28-
FROM mz_tables
29-
JOIN mz_schemas
30-
ON mz_tables.schema_id = mz_schemas.id
31-
JOIN mz_databases
32-
ON mz_schemas.database_id = mz_databases.id
33-
JOIN mz_sources
34-
ON mz_tables.source_id = mz_sources.id
35-
JOIN mz_schemas AS source_schemas
36-
ON mz_sources.schema_id = source_schemas.id
37-
JOIN mz_databases AS source_databases
38-
ON source_schemas.database_id = source_databases.id
39-
LEFT JOIN mz_internal.mz_kafka_source_tables
40-
ON mz_tables.id = mz_kafka_source_tables.id
41-
JOIN mz_roles
42-
ON mz_tables.owner_id = mz_roles.id
43-
LEFT JOIN (
44-
SELECT id, comment
45-
FROM mz_internal.mz_comments
46-
WHERE object_type = 'table'
47-
AND object_sub_id IS NULL
48-
) comments
49-
ON mz_tables.id = comments.id
18+
SELECT
19+
mz_tables.id,
20+
mz_tables.name,
21+
mz_schemas.name AS schema_name,
22+
mz_databases.name AS database_name,
23+
mz_sources.name AS source_name,
24+
source_schemas.name AS source_schema_name,
25+
source_databases.name AS source_database_name,
26+
mz_kafka_source_tables.topic AS upstream_table_name,
27+
mz_kafka_source_tables.envelope_type,
28+
mz_kafka_source_tables.key_format,
29+
mz_kafka_source_tables.value_format,
30+
mz_sources.type AS source_type,
31+
comments.comment AS comment,
32+
mz_roles.name AS owner_name,
33+
mz_tables.privileges
34+
FROM mz_tables
35+
JOIN mz_schemas
36+
ON mz_tables.schema_id = mz_schemas.id
37+
JOIN mz_databases
38+
ON mz_schemas.database_id = mz_databases.id
39+
JOIN mz_sources
40+
ON mz_tables.source_id = mz_sources.id
41+
JOIN mz_schemas AS source_schemas
42+
ON mz_sources.schema_id = source_schemas.id
43+
JOIN mz_databases AS source_databases
44+
ON source_schemas.database_id = source_databases.id
45+
LEFT JOIN mz_internal.mz_kafka_source_tables
46+
ON mz_tables.id = mz_kafka_source_tables.id
47+
JOIN mz_roles
48+
ON mz_tables.owner_id = mz_roles.id
49+
LEFT JOIN (
50+
SELECT id, comment
51+
FROM mz_internal.mz_comments
52+
WHERE object_type = 'table'
53+
AND object_sub_id IS NULL
54+
) comments
55+
ON mz_tables.id = comments.id
5056
`
5157

5258
func SourceTableKafkaId(conn *sqlx.DB, obj MaterializeObject) (string, error) {

pkg/resources/resource_source_table_kafka.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,8 @@ func sourceTableKafkaRead(ctx context.Context, d *schema.ResourceData, meta inte
361361
return diag.FromErr(err)
362362
}
363363

364+
// TODO: include envelope_type, key_format and value_format from mz_internal.mz_kafka_source_tables
365+
364366
if err := d.Set("ownership_role", t.OwnerName.String); err != nil {
365367
return diag.FromErr(err)
366368
}

0 commit comments

Comments
 (0)