Skip to content

Commit 98dff65

Browse files
committed
Add more logging for cloud run debugging
1 parent 4ce3667 commit 98dff65

File tree

2 files changed

+177
-94
lines changed

2 files changed

+177
-94
lines changed

spark-bigquery-connector-common/src/test/java/com/google/cloud/spark/bigquery/integration/CatalogIntegrationTestBase.java

Lines changed: 112 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class CatalogIntegrationTestBase {
4545

4646
protected static SparkSession spark;
4747
private String testTable;
48-
// 2. Initialize the SparkSession ONCE before all tests
48+
4949
@BeforeClass
5050
public static void setupSparkSession() {
5151
spark =
@@ -60,8 +60,6 @@ public static void setupSparkSession() {
6060
.getOrCreate();
6161
}
6262

63-
// 4. Stop the SparkSession ONCE after all tests are done
64-
// This fixes the local IllegalStateException (race condition)
6563
@AfterClass
6664
public static void teardownSparkSession() {
6765
if (spark != null) {
@@ -253,66 +251,130 @@ public void testDropDatabase() {
253251

254252
@Test
255253
public void testCatalogInitializationWithProject() {
256-
spark
257-
.conf()
258-
.set("spark.sql.catalog.public_catalog", "com.google.cloud.spark.bigquery.BigQueryCatalog");
259-
spark.conf().set("spark.sql.catalog.public_catalog.project", "bigquery-public-data");
260-
261-
List<Row> rows = spark.sql("SHOW DATABASES IN public_catalog").collectAsList();
262-
List<String> databaseNames =
263-
rows.stream().map(row -> row.getString(0)).collect(Collectors.toList());
264-
assertThat(databaseNames).contains("samples");
265-
266-
List<Row> data =
267-
spark.sql("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10").collectAsList();
268-
assertThat(data).hasSize(10);
254+
try {
255+
spark
256+
.conf()
257+
.set(
258+
"spark.sql.catalog.public_catalog",
259+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
260+
// Use 'projectId' instead of 'project' - this is the correct property name
261+
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
262+
263+
// Add a small delay to ensure catalog is fully initialized
264+
Thread.sleep(2000);
265+
266+
// Verify catalog is accessible before querying
267+
try {
268+
spark.sql("USE public_catalog");
269+
} catch (Exception e) {
270+
// Catalog might not support USE, that's okay
271+
}
272+
273+
List<Row> rows = spark.sql("SHOW DATABASES IN public_catalog").collectAsList();
274+
List<String> databaseNames =
275+
rows.stream().map(row -> row.getString(0)).collect(Collectors.toList());
276+
assertThat(databaseNames).contains("samples");
277+
278+
List<Row> data =
279+
spark.sql("SELECT * FROM public_catalog.samples.shakespeare LIMIT 10").collectAsList();
280+
assertThat(data).hasSize(10);
281+
} catch (Exception e) {
282+
// Log the full stack trace to help debug cloud build failures
283+
e.printStackTrace();
284+
throw new RuntimeException("Test failed with detailed error", e);
285+
} finally {
286+
// Clean up catalog configuration to avoid interference with other tests
287+
try {
288+
spark.conf().unset("spark.sql.catalog.public_catalog");
289+
spark.conf().unset("spark.sql.catalog.public_catalog.projectId");
290+
} catch (Exception ignored) {
291+
}
292+
}
269293
}
270294

271295
@Test
272296
public void testCreateCatalogWithLocation() throws Exception {
273297
String database = String.format("create_db_with_location_%s", System.nanoTime());
274298
DatasetId datasetId = DatasetId.of(database);
275-
spark
276-
.conf()
277-
.set(
278-
"spark.sql.catalog.test_location_catalog",
279-
"com.google.cloud.spark.bigquery.BigQueryCatalog");
280-
spark.conf().set("spark.sql.catalog.test_location_catalog.bigquery_location", "EU");
281-
spark.sql("CREATE DATABASE test_location_catalog." + database);
282-
Dataset dataset = bigquery.getDataset(datasetId);
283-
assertThat(dataset).isNotNull();
284-
assertThat(dataset.getLocation()).isEqualTo("EU");
285-
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
299+
try {
300+
spark
301+
.conf()
302+
.set(
303+
"spark.sql.catalog.test_location_catalog",
304+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
305+
spark.conf().set("spark.sql.catalog.test_location_catalog.bigquery_location", "EU");
306+
307+
// Add delay for catalog initialization
308+
Thread.sleep(2000);
309+
310+
spark.sql("CREATE DATABASE test_location_catalog." + database);
311+
Dataset dataset = bigquery.getDataset(datasetId);
312+
assertThat(dataset).isNotNull();
313+
assertThat(dataset.getLocation()).isEqualTo("EU");
314+
} finally {
315+
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
316+
// Clean up catalog configuration
317+
try {
318+
spark.conf().unset("spark.sql.catalog.test_location_catalog");
319+
spark.conf().unset("spark.sql.catalog.test_location_catalog.bigquery_location");
320+
} catch (Exception ignored) {
321+
}
322+
}
286323
}
287324

288325
@Test
289326
public void testCreateTableAsSelectWithProjectAndLocation() {
290327
String database = String.format("ctas_db_with_location_%s", System.nanoTime());
291328
String newTable = "ctas_table_from_public";
292329
DatasetId datasetId = DatasetId.of(database);
293-
spark
294-
.conf()
295-
.set("spark.sql.catalog.public_catalog", "com.google.cloud.spark.bigquery.BigQueryCatalog");
296-
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
297-
spark
298-
.conf()
299-
.set(
300-
"spark.sql.catalog.test_catalog_as_select",
301-
"com.google.cloud.spark.bigquery.BigQueryCatalog");
302-
spark.conf().set("spark.sql.catalog.test_catalog_as_select.bigquery_location", "EU");
303-
spark.sql("CREATE DATABASE test_catalog_as_select." + database);
304-
spark.sql(
305-
"CREATE TABLE test_catalog_as_select."
306-
+ database
307-
+ "."
308-
+ newTable
309-
+ " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10");
310-
Dataset dataset = bigquery.getDataset(datasetId);
311-
assertThat(dataset).isNotNull();
312-
assertThat(dataset.getLocation()).isEqualTo("EU");
313-
Table table = bigquery.getTable(TableId.of(datasetId.getDataset(), newTable));
314-
assertThat(table).isNotNull();
315-
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
330+
try {
331+
spark
332+
.conf()
333+
.set(
334+
"spark.sql.catalog.public_catalog",
335+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
336+
// Use 'projectId' instead of 'project'
337+
spark.conf().set("spark.sql.catalog.public_catalog.projectId", "bigquery-public-data");
338+
spark
339+
.conf()
340+
.set(
341+
"spark.sql.catalog.test_catalog_as_select",
342+
"com.google.cloud.spark.bigquery.BigQueryCatalog");
343+
spark.conf().set("spark.sql.catalog.test_catalog_as_select.bigquery_location", "EU");
344+
345+
// Add delay for catalog initialization
346+
Thread.sleep(2000);
347+
348+
spark.sql("CREATE DATABASE test_catalog_as_select." + database);
349+
350+
// Add another small delay after database creation
351+
Thread.sleep(1000);
352+
353+
spark.sql(
354+
"CREATE TABLE test_catalog_as_select."
355+
+ database
356+
+ "."
357+
+ newTable
358+
+ " AS SELECT * FROM public_catalog.samples.shakespeare LIMIT 10");
359+
Dataset dataset = bigquery.getDataset(datasetId);
360+
assertThat(dataset).isNotNull();
361+
assertThat(dataset.getLocation()).isEqualTo("EU");
362+
Table table = bigquery.getTable(TableId.of(datasetId.getDataset(), newTable));
363+
assertThat(table).isNotNull();
364+
} catch (Exception e) {
365+
e.printStackTrace();
366+
throw new RuntimeException("Test failed with detailed error", e);
367+
} finally {
368+
bigquery.delete(datasetId, BigQuery.DatasetDeleteOption.deleteContents());
369+
// Clean up catalog configurations
370+
try {
371+
spark.conf().unset("spark.sql.catalog.public_catalog");
372+
spark.conf().unset("spark.sql.catalog.public_catalog.projectId");
373+
spark.conf().unset("spark.sql.catalog.test_catalog_as_select");
374+
spark.conf().unset("spark.sql.catalog.test_catalog_as_select.bigquery_location");
375+
} catch (Exception ignored) {
376+
}
377+
}
316378
}
317379

318380
private static SparkSession createSparkSession() {

spark-bigquery-dsv2/spark-3.5-bigquery-lib/src/main/java/com/google/cloud/spark/bigquery/BigQueryCatalog.java

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,14 @@
1515
*/
1616
package com.google.cloud.spark.bigquery;
1717

18-
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
19-
import com.google.cloud.bigquery.BigQueryException;
20-
import com.google.cloud.bigquery.Dataset;
21-
import com.google.cloud.bigquery.DatasetId;
22-
import com.google.cloud.bigquery.DatasetInfo;
23-
import com.google.cloud.bigquery.Schema;
24-
import com.google.cloud.bigquery.TableDefinition;
25-
import com.google.cloud.bigquery.TableId;
26-
import com.google.cloud.bigquery.TableInfo;
27-
import com.google.cloud.bigquery.connector.common.BigQueryClient;
28-
import com.google.cloud.bigquery.connector.common.BigQueryConnectorException;
29-
import com.google.cloud.spark.bigquery.v2.BigQueryIdentifier;
30-
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTable;
31-
import com.google.cloud.spark.bigquery.v2.Spark3Util;
32-
import com.google.common.base.Preconditions;
33-
import com.google.common.cache.Cache;
34-
import com.google.common.cache.CacheBuilder;
35-
import com.google.common.cache.CacheLoader;
36-
import com.google.common.cache.LoadingCache;
37-
import com.google.common.collect.ImmutableMap;
38-
import com.google.common.collect.Streams;
39-
import com.google.inject.Injector;
4018
import java.util.Arrays;
4119
import java.util.Map;
4220
import java.util.Optional;
4321
import java.util.ServiceLoader;
4422
import java.util.concurrent.ExecutionException;
4523
import java.util.concurrent.TimeUnit;
4624
import java.util.stream.StreamSupport;
25+
4726
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
4827
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
4928
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -63,6 +42,29 @@
6342
import org.slf4j.Logger;
6443
import org.slf4j.LoggerFactory;
6544

45+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
46+
import com.google.cloud.bigquery.BigQueryException;
47+
import com.google.cloud.bigquery.Dataset;
48+
import com.google.cloud.bigquery.DatasetId;
49+
import com.google.cloud.bigquery.DatasetInfo;
50+
import com.google.cloud.bigquery.Schema;
51+
import com.google.cloud.bigquery.TableDefinition;
52+
import com.google.cloud.bigquery.TableId;
53+
import com.google.cloud.bigquery.TableInfo;
54+
import com.google.cloud.bigquery.connector.common.BigQueryClient;
55+
import com.google.cloud.bigquery.connector.common.BigQueryConnectorException;
56+
import com.google.cloud.spark.bigquery.v2.BigQueryIdentifier;
57+
import com.google.cloud.spark.bigquery.v2.Spark35BigQueryTable;
58+
import com.google.cloud.spark.bigquery.v2.Spark3Util;
59+
import com.google.common.base.Preconditions;
60+
import com.google.common.cache.Cache;
61+
import com.google.common.cache.CacheBuilder;
62+
import com.google.common.cache.CacheLoader;
63+
import com.google.common.cache.LoadingCache;
64+
import com.google.common.collect.ImmutableMap;
65+
import com.google.common.collect.Streams;
66+
import com.google.inject.Injector;
67+
6668
public class BigQueryCatalog implements TableCatalog, SupportsNamespaces {
6769

6870
private static final Logger logger = LoggerFactory.getLogger(BigQueryCatalog.class);
@@ -81,23 +83,33 @@ public class BigQueryCatalog implements TableCatalog, SupportsNamespaces {
8183

8284
@Override
8385
public void initialize(String name, CaseInsensitiveStringMap caseInsensitiveStringMap) {
84-
logger.info("Initializing BigQuery table catalog [{}])", name);
85-
Injector injector =
86-
new InjectorBuilder()
87-
.withOptions(caseInsensitiveStringMap.asCaseSensitiveMap())
88-
.withTableIsMandatory(false)
89-
.build();
90-
tableProvider =
91-
StreamSupport.stream(ServiceLoader.load(DataSourceRegister.class).spliterator(), false)
92-
.filter(candidate -> candidate.shortName().equals("bigquery"))
93-
.map(candidate -> (TableProvider) candidate)
94-
.findFirst()
95-
.orElseThrow(
96-
() -> new IllegalStateException("Could not find a BigQuery TableProvider"));
97-
bigQueryClient = injector.getInstance(BigQueryClient.class);
98-
schemaConverters =
99-
SchemaConverters.from(
100-
SchemaConvertersConfiguration.from(injector.getInstance(SparkBigQueryConfig.class)));
86+
logger.info(
87+
"Initializing BigQuery table catalog [{}] with options: {}",
88+
name,
89+
caseInsensitiveStringMap);
90+
91+
try {
92+
Injector injector =
93+
new InjectorBuilder()
94+
.withOptions(caseInsensitiveStringMap.asCaseSensitiveMap())
95+
.withTableIsMandatory(false)
96+
.build();
97+
tableProvider =
98+
StreamSupport.stream(ServiceLoader.load(DataSourceRegister.class).spliterator(), false)
99+
.filter(candidate -> candidate.shortName().equals("bigquery"))
100+
.map(candidate -> (TableProvider) candidate)
101+
.findFirst()
102+
.orElseThrow(
103+
() -> new IllegalStateException("Could not find a BigQuery TableProvider"));
104+
bigQueryClient = injector.getInstance(BigQueryClient.class);
105+
schemaConverters =
106+
SchemaConverters.from(
107+
SchemaConvertersConfiguration.from(injector.getInstance(SparkBigQueryConfig.class)));
108+
logger.info("BigQuery table catalog [{}] initialized successfully", name);
109+
} catch (Exception e) {
110+
logger.error("Failed to initialize BigQuery catalog [{}]", name, e);
111+
throw new BigQueryConnectorException("Failed to initialize BigQuery catalog: " + name, e);
112+
}
101113
}
102114

103115
@Override
@@ -161,7 +173,8 @@ Map<String, String> toLoadProperties(Identifier identifier) {
161173
result.put("dataset", identifier.namespace()[0]);
162174
break;
163175
case 2:
164-
result.put("project", identifier.namespace()[0]);
176+
// Use 'projectId' instead of 'project' to match the connector's configuration
177+
result.put("projectId", identifier.namespace()[0]);
165178
result.put("dataset", identifier.namespace()[1]);
166179
break;
167180
default:
@@ -289,10 +302,18 @@ static TableId toTableId(Identifier identifier) {
289302

290303
@Override
291304
public String[][] listNamespaces() throws NoSuchNamespaceException {
292-
return Streams.stream(bigQueryClient.listDatasets())
293-
.map(Dataset::getDatasetId)
294-
.map(this::toNamespace)
295-
.toArray(String[][]::new);
305+
if (bigQueryClient == null) {
306+
throw new IllegalStateException("BigQuery catalog not properly initialized");
307+
}
308+
try {
309+
return Streams.stream(bigQueryClient.listDatasets())
310+
.map(Dataset::getDatasetId)
311+
.map(this::toNamespace)
312+
.toArray(String[][]::new);
313+
} catch (Exception e) {
314+
logger.error("Error listing namespaces", e);
315+
throw new BigQueryConnectorException("Failed to list namespaces", e);
316+
}
296317
}
297318

298319
private String[] toNamespace(DatasetId datasetId) {

0 commit comments

Comments
 (0)