41
41
import org .apache .hadoop .fs .FSDataInputStream ;
42
42
import org .apache .hadoop .fs .FileSystem ;
43
43
import org .apache .hadoop .fs .Path ;
44
+ import org .apache .parquet .ParquetSizeOverflowException ;
44
45
import org .apache .parquet .Preconditions ;
45
46
import org .apache .parquet .Version ;
46
47
import org .apache .parquet .bytes .ByteBufferAllocator ;
@@ -703,7 +704,7 @@ public void writeDataPage(
703
704
columnIndexBuilder = ColumnIndexBuilder .getNoOpBuilder ();
704
705
long beforeHeader = out .getPos ();
705
706
LOG .debug ("{}: write data page: {} values" , beforeHeader , valueCount );
706
- int compressedPageSize = ( int ) bytes .size ();
707
+ int compressedPageSize = toIntWithCheck ( bytes .size (), "page" );
707
708
metadataConverter .writeDataPageV1Header (
708
709
uncompressedPageSize , compressedPageSize , valueCount , rlEncoding , dlEncoding , valuesEncoding , out );
709
710
long headerSize = out .getPos () - beforeHeader ;
@@ -879,7 +880,7 @@ public void writeDataPage(
879
880
pageHeaderAAD ,
880
881
sizeStatistics );
881
882
offsetIndexBuilder .add (
882
- ( int ) ( out .getPos () - beforeHeader ),
883
+ toIntWithCheck ( out .getPos () - beforeHeader , "page" ),
883
884
rowCount ,
884
885
sizeStatistics != null ? sizeStatistics .getUnencodedByteArrayDataBytes () : Optional .empty ());
885
886
}
@@ -979,7 +980,7 @@ public void writeDataPage(
979
980
currentChunkFirstDataPage = beforeHeader ;
980
981
}
981
982
LOG .debug ("{}: write data page: {} values" , beforeHeader , valueCount );
982
- int compressedPageSize = ( int ) bytes .size ();
983
+ int compressedPageSize = toIntWithCheck ( bytes .size (), "page" );
983
984
if (pageWriteChecksumEnabled ) {
984
985
crc .reset ();
985
986
crcUpdate (bytes );
@@ -1146,12 +1147,14 @@ public void writeDataPageV2(
1146
1147
SizeStatistics sizeStatistics )
1147
1148
throws IOException {
1148
1149
state = state .write ();
1149
- int rlByteLength = toIntWithCheck (repetitionLevels .size ());
1150
- int dlByteLength = toIntWithCheck (definitionLevels .size ());
1150
+ int rlByteLength = toIntWithCheck (repetitionLevels .size (), "page repetition levels" );
1151
+ int dlByteLength = toIntWithCheck (definitionLevels .size (), "page definition levels" );
1151
1152
1152
- int compressedSize = toIntWithCheck (compressedData .size () + repetitionLevels .size () + definitionLevels .size ());
1153
+ int compressedSize =
1154
+ toIntWithCheck (compressedData .size () + repetitionLevels .size () + definitionLevels .size (), "page" );
1153
1155
1154
- int uncompressedSize = toIntWithCheck (uncompressedDataSize + repetitionLevels .size () + definitionLevels .size ());
1156
+ int uncompressedSize =
1157
+ toIntWithCheck (uncompressedDataSize + repetitionLevels .size () + definitionLevels .size (), "page" );
1155
1158
1156
1159
long beforeHeader = out .getPos ();
1157
1160
if (currentChunkFirstDataPage < 0 ) {
@@ -1209,7 +1212,7 @@ public void writeDataPageV2(
1209
1212
BytesInput .concat (repetitionLevels , definitionLevels , compressedData ).writeAllTo (out );
1210
1213
1211
1214
offsetIndexBuilder .add (
1212
- ( int ) ( out .getPos () - beforeHeader ),
1215
+ toIntWithCheck ( out .getPos () - beforeHeader , "page" ),
1213
1216
rowCount ,
1214
1217
sizeStatistics != null ? sizeStatistics .getUnencodedByteArrayDataBytes () : Optional .empty ());
1215
1218
}
@@ -1626,8 +1629,8 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long
1626
1629
long bytesCopied = 0 ;
1627
1630
byte [] buffer = COPY_BUFFER .get ();
1628
1631
while (bytesCopied < length ) {
1629
- long bytesLeft = length - bytesCopied ;
1630
- int bytesRead = from .read (buffer , 0 , (buffer . length < bytesLeft ? buffer .length : ( int ) bytesLeft ));
1632
+ int bytesLeft = Math . toIntExact ( length - bytesCopied ) ;
1633
+ int bytesRead = from .read (buffer , 0 , (Math . min ( buffer .length , bytesLeft ) ));
1631
1634
if (bytesRead < 0 ) {
1632
1635
throw new IllegalArgumentException ("Unexpected end of input file at " + start + bytesCopied );
1633
1636
}
@@ -1707,15 +1710,16 @@ private static void serializeColumnIndexes(
1707
1710
}
1708
1711
long offset = out .getPos ();
1709
1712
Util .writeColumnIndex (columnIndex , out , columnIndexEncryptor , columnIndexAAD );
1710
- column .setColumnIndexReference (new IndexReference (offset , (int ) (out .getPos () - offset )));
1713
+ column .setColumnIndexReference (
1714
+ new IndexReference (offset , toIntWithCheck (out .getPos () - offset , "page" )));
1711
1715
}
1712
1716
}
1713
1717
}
1714
1718
1715
- private int toIntWithCheck (long size ) {
1719
+ private static int toIntWithCheck (long size , String obj ) {
1716
1720
if ((int ) size != size ) {
1717
- throw new ParquetEncodingException (
1718
- "Cannot write page larger than " + Integer . MAX_VALUE + " bytes: " + size );
1721
+ throw new ParquetSizeOverflowException (
1722
+ String . format ( "Cannot write %s larger than %s bytes: %s" , obj , Integer . MAX_VALUE , size ) );
1719
1723
}
1720
1724
return (int ) size ;
1721
1725
}
@@ -1787,7 +1791,8 @@ private static void serializeOffsetIndexes(
1787
1791
out ,
1788
1792
offsetIndexEncryptor ,
1789
1793
offsetIndexAAD );
1790
- column .setOffsetIndexReference (new IndexReference (offset , (int ) (out .getPos () - offset )));
1794
+ column .setOffsetIndexReference (
1795
+ new IndexReference (offset , toIntWithCheck (out .getPos () - offset , "page" )));
1791
1796
}
1792
1797
}
1793
1798
}
@@ -1852,7 +1857,7 @@ private static void serializeBloomFilters(
1852
1857
}
1853
1858
out .write (serializedBitset );
1854
1859
1855
- int length = ( int ) (out .getPos () - offset );
1860
+ int length = Math . toIntExact (out .getPos () - offset );
1856
1861
column .setBloomFilterLength (length );
1857
1862
}
1858
1863
}
@@ -1872,7 +1877,7 @@ private static void serializeFooter(
1872
1877
metadataConverter .toParquetMetadata (CURRENT_VERSION , footer );
1873
1878
writeFileMetaData (parquetMetadata , out );
1874
1879
LOG .debug ("{}: footer length = {}" , out .getPos (), (out .getPos () - footerIndex ));
1875
- BytesUtils .writeIntLittleEndian (out , ( int ) ( out .getPos () - footerIndex ));
1880
+ BytesUtils .writeIntLittleEndian (out , toIntWithCheck ( out .getPos () - footerIndex , "footer" ));
1876
1881
out .write (MAGIC );
1877
1882
return ;
1878
1883
}
@@ -1910,7 +1915,7 @@ private static void serializeFooter(
1910
1915
out .write (serializedFooter );
1911
1916
out .write (signature );
1912
1917
LOG .debug ("{}: footer and signature length = {}" , out .getPos (), (out .getPos () - footerIndex ));
1913
- BytesUtils .writeIntLittleEndian (out , ( int ) ( out .getPos () - footerIndex ));
1918
+ BytesUtils .writeIntLittleEndian (out , toIntWithCheck ( out .getPos () - footerIndex , "page" ));
1914
1919
out .write (MAGIC );
1915
1920
return ;
1916
1921
}
@@ -1920,7 +1925,7 @@ private static void serializeFooter(
1920
1925
writeFileCryptoMetaData (fileEncryptor .getFileCryptoMetaData (), out );
1921
1926
byte [] footerAAD = AesCipher .createFooterAAD (fileEncryptor .getFileAAD ());
1922
1927
writeFileMetaData (parquetMetadata , out , fileEncryptor .getFooterEncryptor (), footerAAD );
1923
- int combinedMetaDataLength = ( int ) ( out .getPos () - cryptoFooterIndex );
1928
+ int combinedMetaDataLength = toIntWithCheck ( out .getPos () - cryptoFooterIndex , "page" );
1924
1929
LOG .debug ("{}: crypto metadata and footer length = {}" , out .getPos (), combinedMetaDataLength );
1925
1930
BytesUtils .writeIntLittleEndian (out , combinedMetaDataLength );
1926
1931
out .write (EFMAGIC );
0 commit comments