Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Digital Compression #414

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 202 additions & 10 deletions Source/Libraries/openXDA.Model/Channels/ChannelData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private class DataPoint
}

#endregion

#region [ Properties ]

[PrimaryKey(true)]
Expand Down Expand Up @@ -219,9 +219,29 @@ private static void MigrateLegacyBlob(AdoDataConnection connection, Event evt)

private static byte[] ToData(List<DataPoint> data, int seriesID)
{
//We can use Digital compression if the data changes no more than 10% of the time.
bool useDigitalCompression = true;

int nChanges = 0;
int i = 0;
while (i < data.Count)
{
if (nChanges > 0.1 * data.Count)
{
useDigitalCompression = false;
break;
}
if (i < 1 && data[i].Value != data[i + 1].Value)
nChanges++;
i++;
}

if (useDigitalCompression)
return ToDigitalData(data, seriesID);

var timeSeries = data.Select(dataPoint => new { Time = dataPoint.Time.Ticks, Compressed = false }).ToList();

for (int i = 1; i < timeSeries.Count; i++)
for (i = 1; i < timeSeries.Count; i++)
{
long previousTimestamp = data[i - 1].Time.Ticks;
long timestamp = timeSeries[i].Time;
Expand All @@ -234,7 +254,7 @@ private static byte[] ToData(List<DataPoint> data, int seriesID)
int timeSeriesByteLength = timeSeries.Sum(obj => obj.Compressed ? sizeof(ushort) : sizeof(int) + sizeof(long));
int dataSeriesByteLength = sizeof(int) + (2 * sizeof(double)) + (data.Count * sizeof(ushort));
int totalByteLength = sizeof(int) + timeSeriesByteLength + dataSeriesByteLength;

byte[] result = new byte[totalByteLength];
int offset = 0;

Expand All @@ -246,7 +266,7 @@ private static byte[] ToData(List<DataPoint> data, int seriesID)
.Select(obj => obj.Index)
.ToList();

for (int i = 0; i < uncompressedIndexes.Count; i++)
for (i = 0; i < uncompressedIndexes.Count; i++)
{
int index = uncompressedIndexes[i];
int nextIndex = (i + 1 < uncompressedIndexes.Count) ? uncompressedIndexes[i + 1] : timeSeries.Count;
Expand Down Expand Up @@ -289,6 +309,67 @@ private static byte[] ToData(List<DataPoint> data, int seriesID)
return returnArray;
}

private static byte[] ToDigitalData(List<DataPoint> data, int seriesID)
{
int i;
IEnumerable<DataPoint> changeSeries = data.Where((point,index) => index == 0 || index == data.Count - 1 || point.Value != data[index-1].Value);

const ushort NaNValue = ushort.MaxValue;
const ushort MaxCompressedValue = ushort.MaxValue - 1;
double range = changeSeries.Select(item => item.Value).Max() - changeSeries.Select(item => item.Value).Min();
double decompressionOffset = changeSeries.Select(item => item.Value).Min();
double decompressionScale = range / MaxCompressedValue;
double compressionScale = (decompressionScale != 0.0D) ? 1.0D / decompressionScale : 0.0D;

long startTime = data.First().Time.Ticks;
long sampleRate = data[1].Time.Ticks - startTime;

int totalByteLength = 2 * sizeof(int) + 2 * sizeof(long) + 2 * sizeof(double) + changeSeries.Count() * (sizeof(int) + sizeof(ushort));

byte[] result = new byte[totalByteLength];
int offset = 0;

offset += LittleEndian.CopyBytes(data.Count, result, offset);
offset += LittleEndian.CopyBytes(seriesID, result, offset);

offset += LittleEndian.CopyBytes(startTime, result, offset);
offset += LittleEndian.CopyBytes(sampleRate, result, offset);

offset += LittleEndian.CopyBytes(decompressionOffset, result, offset);
offset += LittleEndian.CopyBytes(decompressionScale, result, offset);

foreach (DataPoint dataPoint in changeSeries)
{
ushort compressedValue = (ushort)Math.Round((dataPoint.Value - decompressionOffset) * compressionScale);

if (compressedValue == NaNValue)
compressedValue--;

if (double.IsNaN(dataPoint.Value))
compressedValue = NaNValue;

int time = (int)(dataPoint.Time.Ticks - startTime);

offset += LittleEndian.CopyBytes(time, result, offset);
offset += LittleEndian.CopyBytes(compressedValue, result, offset);
}

byte[] returnArray = GZipStream.CompressBuffer(result);

if (changeSeries.Count() == 2 && changeSeries.First().Value == changeSeries.Last().Value)
{
returnArray[0] = constantDigitalHeader[0];
returnArray[1] = constantDigitalHeader[1];
}
else
{
returnArray[0] = digitalHeader[0];
returnArray[1] = digitalHeader[1];
}

return returnArray;
}

private static List<Tuple<int, List<DataPoint>>> Decompress(byte[] data)
{
List<Tuple<int, List<DataPoint>>> result = new List<Tuple<int, List<DataPoint>>>();
Expand All @@ -297,14 +378,19 @@ private static List<Tuple<int, List<DataPoint>>> Decompress(byte[] data)
return result;
// If the blob contains the GZip header,
// use the legacy deserialization algorithm
if (data[0] == 0x1F && data[1] == 0x8B)
if (data[0] == legacyHeader[0] && data[1] == legacyHeader[1])
{
return Decompress_Legacy(data);
}

// If this blob uses digital decompression use that algorithm
if ((data[0] == digitalHeader[0] && data[1] == digitalHeader[1]) || (data[0] == constantDigitalHeader[0] && data[1] == constantDigitalHeader[1]))
{
return Decompress_Digital(data);
}

// Restore the GZip header before uncompressing
data[0] = 0x1F;
data[1] = 0x8B;
data[0] = legacyHeader[0];
data[1] = legacyHeader[1];

byte[] uncompressedData;
int offset;
Expand Down Expand Up @@ -362,9 +448,9 @@ private static List<Tuple<int, List<DataPoint>>> Decompress(byte[] data)
Value = decompressedValue
});
}


result.Add(new Tuple<int, List<DataPoint>>(seriesID, dataSeries));

result.Add(new Tuple<int, List<DataPoint>>(seriesID, dataSeries));
}

return result;
Expand Down Expand Up @@ -415,6 +501,93 @@ private static List<Tuple<int, List<DataPoint>>> Decompress_Legacy(byte[] data)
return result;
}

/// <summary>
/// Decompresses a Digital stored as compresed series of changes
/// </summary>
/// <param name="data"> The compressed <see cref="byte[]"/></param>
/// <returns> a Dictionary mapping a SeriesID to a decopmressed <see cref="List{DataPoint}"/></returns>
private static List<Tuple<int, List<DataPoint>>> Decompress_Digital(byte[] data)
{
List<Tuple<int, List<DataPoint>>> result = new List<Tuple<int, List<DataPoint>>>();
byte[] uncompressedData;
int offset;
int m_samples;
DateTime startTime;
int seriesID;
long samplingrate;
List<DataPoint> points = new List<DataPoint>();

// Restore the GZip header before uncompressing
data[0] = legacyHeader[0];
data[1] = legacyHeader[1];

uncompressedData = GZipStream.UncompressBuffer(data);
offset = 0;

m_samples = LittleEndian.ToInt32(uncompressedData, offset);
offset += sizeof(int);

if (m_samples < 2)
throw new InvalidOperationException("Digital Data must have the first and last point encoded.");

seriesID = LittleEndian.ToInt32(uncompressedData, offset);
offset += sizeof(int);

startTime = new DateTime(LittleEndian.ToInt64(uncompressedData, offset));
offset += sizeof(long);

samplingrate = LittleEndian.ToInt64(uncompressedData, offset);
offset += sizeof(long);

double decompressionOffset = LittleEndian.ToDouble(uncompressedData, offset);
double decompressionScale = LittleEndian.ToDouble(uncompressedData, offset + sizeof(double));
offset += 2 * sizeof(double);

// Always have the first point available.
int time = LittleEndian.ToUInt16(uncompressedData, offset);
offset += sizeof(int);
ushort compressedValue = LittleEndian.ToUInt16(uncompressedData, offset);
offset += sizeof(ushort);

double decompressedValue = decompressionScale * compressedValue + decompressionOffset;

double lastValue = decompressedValue;
long lastTime = time;

for (int i = 1; i < m_samples; i++)
{
time = LittleEndian.ToUInt16(uncompressedData, offset);
offset += sizeof(int);
compressedValue = LittleEndian.ToUInt16(uncompressedData, offset);
offset += sizeof(ushort);

decompressedValue = decompressionScale * compressedValue + decompressionOffset;
while (time > (lastTime + samplingrate))
{
lastTime += samplingrate;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if rounding errors could potentially introduce additional data points between state changes. Do you think it might cause problems downstream if points.Count > m_samples?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m_samples get's removed from the blob so I am not sure that even matters.

By definition we assume these "Digitals" are evenly sampled so it will generate an evenly sampled flat line until it hits a state change. In theory it is possible that we Loose some points in between or generate extra ones, but since it's a flat line anyway I am not too concerned about that.

Occasionally we may compress a latched analog this way, which I suppose could be an issue, but I believe we already assume fixed sampling rate in a few places when doing math so not sure that really matters either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off the top of my head, my main concern would be this.

// If the data being added matches the parameters for this data group, add the data to the data group
// Note that it does not have to match Asset
if (startTime == m_startTime && endTime == m_endTime && samples == m_samples)
{
m_dataSeries.Add(dataSeries);
return true;
}
return false;

If the actual number of decoded data points doesn't match DataGroup.m_samples, then the DataGroup.Add() method will fail to even include the digital series in the DataGroup. This is called directly by DataGroup.FromData() so the decoded data would just be silently dropped on the floor, and we would be wondering why it's not getting returned to the visualization.

Also, I suppose I'm assuming that there will be a follow-up PR that includes changes to DataGroup.FromData()?

points.Add(new DataPoint()
{
Time = startTime.AddTicks(lastTime),
Value = lastValue
});

}

points.Add(new DataPoint()
{
Time = startTime.AddTicks(time),
Value = decompressedValue
});

lastTime = time;
lastValue = decompressedValue;
}

result.Add(new Tuple<int, List<DataPoint>>(seriesID, points));

return result;
}

private static HashSet<int> QueryChannelsWithData(AdoDataConnection connection, Event evt)
{
const string FilterQueryFormat =
Expand Down Expand Up @@ -450,6 +623,25 @@ private static object ToDateTime2(AdoDataConnection connection, DateTime dateTim
}
}

/// <summary>
/// The header of a datablob compressed as analog Data
/// </summary>
public static readonly byte[] analogHeader = { 0x11, 0x11 };

/// <summary>
/// The header of a datablob compressed as Digital State Changes
/// </summary>
public static readonly byte[] digitalHeader = { 0x22, 0x22 };

/// <summary>
/// The header of a datablob compressed as empty Digital State Changes
/// </summary>
public static readonly byte[] constantDigitalHeader = { 0x33, 0x33 };

/// <summary>
/// The header of a datablob compressed as Legacy Data
/// </summary>
public static readonly byte[] legacyHeader = { 0x1F, 0x8B };
#endregion
}
}