1717
1818package org .apache .kyuubi .spark .connector .hive
1919
20+ import com .google .common .collect .Maps
2021import scala .concurrent .duration .DurationInt
21-
2222import org .apache .hadoop .fs .{FileStatus , Path }
2323import org .apache .spark .sql .internal .{SQLConf , StaticSQLConf }
2424import org .scalatest .concurrent .Eventually .eventually
2525import org .scalatest .concurrent .Futures .timeout
26-
2726import org .apache .kyuubi .spark .connector .hive .read .HiveFileStatusCache
27+ import org .apache .spark .sql .connector .catalog .Identifier
28+ import org .apache .spark .sql .util .CaseInsensitiveStringMap
2829
2930class HiveFileStatusCacheSuite extends KyuubiHiveTest {
3031
@@ -38,9 +39,9 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest {
3839 val files = (1 to 3 ).map(_ => new FileStatus ())
3940
4041 HiveFileStatusCache .resetForTesting()
41- val fileStatusCacheTabel1 = HiveFileStatusCache .getOrCreate(spark, " catalog.db.table1 " )
42+ val fileStatusCacheTabel1 = HiveFileStatusCache .getOrCreate(spark, " catalog.db.cat1Table " )
4243 fileStatusCacheTabel1.putLeafFiles(path, files.toArray)
43- val fileStatusCacheTabel2 = HiveFileStatusCache .getOrCreate(spark, " catalog.db.table1 " )
44+ val fileStatusCacheTabel2 = HiveFileStatusCache .getOrCreate(spark, " catalog.db.cat1Table " )
4445 val fileStatusCacheTabel3 = HiveFileStatusCache .getOrCreate(spark, " catalog.db.table2" )
4546
4647 // Exactly 3 files are cached.
@@ -83,4 +84,173 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest {
8384 SQLConf .get.setConf(StaticSQLConf .METADATA_CACHE_TTL_SECONDS , previousValue)
8485 }
8586 }
86- }
87+
88+ private def newCatalog (): HiveTableCatalog = {
89+ val catalog = new HiveTableCatalog
90+ val properties = Maps .newHashMap[String , String ]()
91+ properties.put(" javax.jdo.option.ConnectionURL" , " jdbc:derby:memory:memorydb;create=true" )
92+ properties.put(" javax.jdo.option.ConnectionDriverName" , " org.apache.derby.jdbc.EmbeddedDriver" )
93+ catalog.initialize(catalogName, new CaseInsensitiveStringMap (properties))
94+ catalog
95+ }
96+
97+ test(" expire FileStatusCache when insert into" ) {
98+ val dbName = " default"
99+ val tbName = " tbl_partition"
100+ val table = s " ${catalogName}. ${dbName}. ${tbName}"
101+
102+ withTable(table) {
103+ spark.sql(s " create table $table (age int)partitioned by(city string) stored as orc " ).collect()
104+ val location = newCatalog()
105+ .loadTable(Identifier .of(Array (dbName), tbName))
106+ .asInstanceOf [HiveTable ]
107+ .catalogTable.location.toString
108+
109+ spark.sql(s " insert into $table partition(city='ct') values(10),(20),(30),(40),(50) " ).collect()
110+ assert(HiveFileStatusCache .getOrCreate(spark, table)
111+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
112+
113+ assert(spark.sql(s " select * from $table" ).count() === 5 )
114+ assert(HiveFileStatusCache .getOrCreate(spark, table)
115+ .getLeafFiles(new Path (s " $location/city=ct " )).get.length === 1 )
116+
117+ spark.sql(s " insert into $table partition(city='ct') values(11),(21),(31),(41),(51) " ).collect()
118+ assert(HiveFileStatusCache .getOrCreate(spark, table)
119+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
120+
121+ assert(spark.sql(s " select * from $table" ).count() === 10 )
122+ assert(HiveFileStatusCache .getOrCreate(spark, table)
123+ .getLeafFiles(new Path (s " $location/city=ct " )).get.length === 2 )
124+ }
125+ }
126+
127+ test(" expire FileStatusCache when insert overwrite" ) {
128+ val dbName = " default"
129+ val tbName = " tbl_partition"
130+ val table = s " ${catalogName}. ${dbName}. ${tbName}"
131+
132+ withTable(table) {
133+ spark.sql(s " create table $table (age int)partitioned by(city string) stored as orc " ).collect()
134+ val location = newCatalog()
135+ .loadTable(Identifier .of(Array (dbName), tbName))
136+ .asInstanceOf [HiveTable ]
137+ .catalogTable.location.toString
138+
139+ spark.sql(s " insert into $table partition(city='ct') values(10),(20),(30),(40),(50) " ).collect()
140+ assert(HiveFileStatusCache .getOrCreate(spark, table)
141+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
142+
143+ assert(spark.sql(s " select * from $table" ).count() === 5 )
144+ assert(HiveFileStatusCache .getOrCreate(spark, table)
145+ .getLeafFiles(new Path (s " $location/city=ct " )).get.length === 1 )
146+
147+ spark.sql(s " insert overwrite $table partition(city='ct') values(11),(21),(31),(41),(51) " )
148+ .collect()
149+ assert(HiveFileStatusCache .getOrCreate(spark, table)
150+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
151+
152+ assert(spark.sql(s " select * from $table" ).count() === 5 )
153+ assert(HiveFileStatusCache .getOrCreate(spark, table)
154+ .getLeafFiles(new Path (s " $location/city=ct " )).get.length === 1 )
155+ }
156+ }
157+
158+ test(" expire FileStatusCache when alter Table" ) {
159+ val dbName = " default"
160+ val tbName = " tbl_partition"
161+ val table = s " ${catalogName}. ${dbName}. ${tbName}"
162+
163+ withTable(table) {
164+ spark.sql(s " create table $table (age int)partitioned by(city string) stored as orc " ).collect()
165+ val location = newCatalog()
166+ .loadTable(Identifier .of(Array (dbName), tbName))
167+ .asInstanceOf [HiveTable ]
168+ .catalogTable.location.toString
169+
170+ spark.sql(s " insert into $table partition(city='ct') values(10),(20),(30),(40),(50) " ).collect()
171+ spark.sql(s " select * from $table" ).collect()
172+ assert(HiveFileStatusCache .getOrCreate(spark, table)
173+ .getLeafFiles(new Path (s " $location/city=ct " )).get.length === 1 )
174+
175+ spark.sql(s " ALTER TABLE $table ADD COLUMNS (name string) " ).collect()
176+ assert(HiveFileStatusCache .getOrCreate(spark, table)
177+ .getLeafFiles(new Path (s " $location/city=ct " )).isEmpty)
178+ }
179+ }
180+
181+ test(" expire FileStatusCache when rename Table" ) {
182+ val dbName = " default"
183+ val oldTbName = " tbl_partition"
184+ val newTbName = " tbl_partition_new"
185+ val oldTable = s " ${catalogName}. ${dbName}. ${oldTbName}"
186+ val newTable = s " ${catalogName}. ${dbName}. ${newTbName}"
187+
188+ withTable(newTable) {
189+ spark.sql(s " create table ${oldTable} (age int)partitioned by(city string) stored as orc " )
190+ .collect()
191+ spark.sql(s " insert into $oldTable partition(city='ct') values(10),(20),(30),(40),(50) " )
192+ .collect()
193+ spark.sql(s " select * from $oldTable" ).collect()
194+
195+ val oldLocation = newCatalog()
196+ .loadTable(Identifier .of(Array (dbName), oldTbName))
197+ .asInstanceOf [HiveTable ]
198+ .catalogTable.location.toString
199+ assert(HiveFileStatusCache .getOrCreate(spark, oldTable)
200+ .getLeafFiles(new Path (s " $oldLocation/city=ct " )).get.length === 1 )
201+
202+ spark.sql(s " DROP TABLE IF EXISTS ${newTable}" ).collect()
203+ spark.sql(s " use ${catalogName}. ${dbName}" ).collect()
204+ spark.sql(s " ALTER TABLE $oldTbName RENAME TO $newTbName" ).collect()
205+ val newLocation = newCatalog()
206+ .loadTable(Identifier .of(Array (dbName), newTbName))
207+ .asInstanceOf [HiveTable ]
208+ .catalogTable.location.toString
209+
210+ assert(HiveFileStatusCache .getOrCreate(spark, oldTable)
211+ .getLeafFiles(new Path (s " $oldLocation/city=ct " ))
212+ .isEmpty)
213+
214+ assert(HiveFileStatusCache .getOrCreate(spark, newTable)
215+ .getLeafFiles(new Path (s " $newLocation/city=ct " ))
216+ .isEmpty)
217+ }
218+ }
219+
220+ test(" FileStatusCache isolated between different catalogs with same database.table" ) {
221+ val catalog1 = catalogName
222+ val catalog2 = " hive2"
223+ val dbName = " default"
224+ val tbName = " tbl_partition"
225+ val dbTableShortName = s " ${dbName}. ${tbName}"
226+ val cat1Table = s " ${catalog1}. ${dbTableShortName}"
227+ val cat2Table = s " ${catalog2}. ${dbTableShortName}"
228+
229+ withTable(cat1Table, cat2Table) {
230+ spark.sql(s " CREATE TABLE IF NOT EXISTS $cat1Table (age int)partitioned by(city string) " +
231+ s " stored as orc " ).collect()
232+ val location = newCatalog()
233+ .loadTable(Identifier .of(Array (dbName), tbName))
234+ .asInstanceOf [HiveTable ]
235+ .catalogTable.location.toString
236+
237+ spark.sql(s " use $catalog1" ).collect()
238+ spark.sql(s " insert into $dbTableShortName partition(city='ct1') " +
239+ s " values(11),(12),(13),(14),(15) " ).collect()
240+ spark.sql(s " select * from $cat1Table where city='ct1' " ).collect()
241+ assert(HiveFileStatusCache .getOrCreate(spark, cat1Table)
242+ .getLeafFiles(new Path (s " $location/city=ct1 " ))
243+ .get.length === 1 )
244+
245+ spark.sql(s " use $catalog2" ).collect()
246+ spark.sql(s " insert into $dbTableShortName partition(city='ct2') " +
247+ s " values(21),(22),(23),(24),(25) " ).collect()
248+ spark.sql(s " select * from $cat2Table where city='ct2' " ).collect()
249+ assert(HiveFileStatusCache .getOrCreate(spark, cat2Table)
250+ .getLeafFiles(new Path (s " $location/city=ct1 " )).isEmpty)
251+ assert(HiveFileStatusCache .getOrCreate(spark, cat2Table)
252+ .getLeafFiles(new Path (s " $location/city=ct2 " ))
253+ .get.length === 1 )
254+ }
255+ }
256+ }
0 commit comments