Skip to content

Commit

Permalink
support columns with forward slashes
Browse files Browse the repository at this point in the history
  • Loading branch information
mukunku committed Nov 29, 2023
1 parent a9ce3b3 commit e84f6c6
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 21 deletions.
18 changes: 10 additions & 8 deletions src/ParquetViewer.Engine/ParquetEngine.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public partial class ParquetEngine
public async Task<DataTable> ReadRowsAsync(List<string> selectedFields, int offset, int recordCount, CancellationToken cancellationToken, IProgress<int>? progress = null)
{
long recordsLeftToRead = recordCount;
DataTable result = BuildDataTable(selectedFields);
DataTable result = BuildDataTable(null, selectedFields);
result.BeginLoadData(); //might speed things up

foreach (var reader in this.GetReaders(offset))
Expand Down Expand Up @@ -86,7 +86,7 @@ private async Task ProcessRowGroup(DataTable dataTable, ParquetRowGroupReader gr
{
cancellationToken.ThrowIfCancellationRequested();

var field = ParquetSchemaTree.GetChild(column.ColumnName);
var field = ParquetSchemaTree.GetChild(column.ExtendedProperties["Parent"] as string, column.ColumnName);
if (field.SchemaElement.LogicalType?.LIST is not null || field.SchemaElement.ConvertedType == Parquet.Meta.ConvertedType.LIST)
{
await ReadListField(dataTable, groupReader, rowBeginIndex, field, skipRecords,
Expand Down Expand Up @@ -120,7 +120,7 @@ private async Task ReadPrimitiveField(DataTable dataTable, ParquetRowGroupReader
int skippedRecords = 0;
var dataColumn = await groupReader.ReadColumnAsync(field.DataField ?? throw new Exception($"Pritimive field `{field.Path}` is missing its data field"), cancellationToken);

var fieldIndex = dataTable.Columns[field.DataField.Path.ToString()]?.Ordinal ?? throw new Exception($"Column `{field.Path}` is missing");
var fieldIndex = dataTable.Columns[field.Path]?.Ordinal ?? throw new Exception($"Column `{field.Path}` is missing");
foreach (var value in dataColumn.Data)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -285,7 +285,7 @@ private async Task ReadStructField(DataTable dataTable, ParquetRowGroupReader gr
long skipRecords, long readRecords, bool isFirstColumn, CancellationToken cancellationToken, IProgress<int>? progress)
{
//Read struct data as a new datatable
DataTable structFieldDataTable = BuildDataTable(field.Children.Select(f => $"{field.Path}/{f.Path}").ToList());
DataTable structFieldDataTable = BuildDataTable(field.Path, field.Children.Select(f => f.Path).ToList());

//Need to calculate progress differently for structs
var structFieldReadProgress = new SimpleProgress();
Expand Down Expand Up @@ -350,12 +350,12 @@ private DataRow GetRow(DataTable dataTable, int rowIndex)
}
}

private DataTable BuildDataTable(List<string> fields)
private DataTable BuildDataTable(string? parent, List<string> fields)
{
DataTable dataTable = new();
foreach (var field in fields)
{
var schema = ParquetSchemaTree.GetChild(field);
var schema = ParquetSchemaTree.GetChild(parent, field);

DataColumn newColumn;
if (schema.SchemaElement.ConvertedType == ConvertedType.LIST)
Expand All @@ -379,14 +379,16 @@ private DataTable BuildDataTable(List<string> fields)
}
else
{
var clrType = schema.DataField?.ClrType ?? throw new Exception($"{field} has no data field");
var clrType = schema.DataField?.ClrType ?? throw new Exception($"{(parent is not null ? parent + "/" : string.Empty)}/{field} has no data field");
newColumn = new DataColumn(field, clrType);
}

newColumn.ExtendedProperties.Add("Parent", parent);

//We don't support case sensitive field names unfortunately
if (dataTable.Columns.Contains(newColumn.ColumnName))
{
throw new NotSupportedException($"Duplicate column '{field}' detected. Column names are case insensitive and must be unique.");
throw new NotSupportedException($"Duplicate column '{(parent is not null ? parent + "/" : string.Empty)}{field}' detected. Column names are case insensitive and must be unique.");
}

dataTable.Columns.Add(newColumn);
Expand Down
20 changes: 9 additions & 11 deletions src/ParquetViewer.Engine/ParquetSchemaElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,22 @@ public ParquetSchemaElement(SchemaElement schemaElement)
this.SchemaElement = schemaElement;
}

public ParquetSchemaElement GetChild(string name)
public ParquetSchemaElement GetChild(string name) => GetChildImpl(name);

public ParquetSchemaElement GetChild(string? parent, string name)
{
if (name?.Contains('/') == true)
{
string currentPath = name.Substring(0, name.IndexOf('/'));
string remainingPath = name.Substring(name.IndexOf('/') + 1);
var child = GetChildImpl(currentPath);
return child.GetChild(remainingPath);
}
else
if (parent is null)
{
return GetChildImpl(name);
}

ParquetSchemaElement GetChildImpl(string? name) => name is not null && _children.TryGetValue(name, out var result)
? result : throw new Exception($"Field schema path not found: `{Path}/{name}`");
var child = GetChildImpl(parent);
return child.GetChild(name);
}

private ParquetSchemaElement GetChildImpl(string? name) => name is not null && _children.TryGetValue(name, out var result)
? result : throw new Exception($"Field schema path not found: `{Path}/{name}`");

public ParquetSchemaElement GetImmediateChildOrSingle(string name)
{
if (_children.TryGetValue(name, out var result))
Expand Down
Binary file not shown.
Binary file not shown.
6 changes: 6 additions & 0 deletions src/ParquetViewer.Tests/ParquetViewer.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
<None Update="Data\COLUMN_ENDING_IN_PERIOD_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\COLUMN_NAME_WITH_FORWARD_SLASH1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\DATETIME_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand Down Expand Up @@ -77,6 +80,9 @@
<None Update="Data\NULLABLE_GUID_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\ORACLE_MALFORMED_INT64_TEST1.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\PARTITIONED_PARQUET_FILE_TEST1\bldgtype=B\bd8c129da60e412db4b21800b9e0b983.parquet">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
Expand Down
28 changes: 27 additions & 1 deletion src/ParquetViewer.Tests/SanityTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,32 @@ public async Task MALFORMED_DATETIME_TEST1()
Assert.Fail("Looks like the Malformed DateTime Fix is no longer needed! Remove that part of the code.");
}
Assert.Equal(typeof(long), dataTable.Rows[0]["ds"]?.GetType()); //If it's not a datetime, then it should be a long.
}
}

[Fact]
public async Task COLUMN_NAME_WITH_FORWARD_SLASH_TEST1()
{
//TODO: need to make this file smaller
using var parquetEngine = await ParquetEngine.OpenFileOrFolderAsync("Data/COLUMN_NAME_WITH_FORWARD_SLASH1.parquet", default);

Assert.Equal(181966, parquetEngine.RecordCount);
Assert.Equal(320, parquetEngine.Fields.Count);

var dataTable = await parquetEngine.ReadRowsAsync(parquetEngine.Fields, 0, 1, default);
Assert.Equal((byte)0, dataTable.Rows[0]["FLC K/L"]);
}

[Fact]
public async Task ORACLE_MALFORMED_INT64_TEST1()
{
using var parquetEngine = await ParquetEngine.OpenFileOrFolderAsync("Data/ORACLE_MALFORMED_INT64_TEST1.parquet", default);

Assert.Equal(126, parquetEngine.RecordCount);
Assert.Equal(2, parquetEngine.Fields.Count);

var dataTable = await parquetEngine.ReadRowsAsync(parquetEngine.Fields, 0, int.MaxValue, default);
Assert.Equal("DEPOSIT", dataTable.Rows[0][0]);
Assert.Equal((long)1, dataTable.Rows[0][1]);
}
}
}
2 changes: 1 addition & 1 deletion src/ParquetViewer/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("2.8.1.1")]
[assembly: AssemblyVersion("2.8.1.2")]

0 comments on commit e84f6c6

Please sign in to comment.