Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a structured pipeline for cepstral coefficients extraction #8

Merged
merged 6 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions plap/core/audio_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

class AudioInfo:
"""
A class used to load audio from files
and hold its properties and extracted features.
Facilitates audio input from files
and holding its properties and extracted features.

...

Expand All @@ -14,12 +14,6 @@ class AudioInfo:
The sample rate of audio
signal : numpy.ndarray
Audio data
blocks: List[numpy.ndarray]
Framed audio data
windowed_blocks: List[numpy.ndarray]
Frames multiplied by a window function
dft_blocks: List[numpy.ndarray]
Fast Fourier Transform (FFT) result for each frame

"""

Expand All @@ -30,8 +24,4 @@ def __init__(self, file: str):
file: str
name of the audio file
"""

self.signal, self.sample_rate = sf.read(file)
self.blocks = []
self.windowed_blocks = []
self.dft_blocks = []
55 changes: 37 additions & 18 deletions plap/core/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ class Preprocessing:
"""

@staticmethod
def framing(audio_info: AudioInfo, block_size: int, overlap: int):
def framing(audio_info: AudioInfo, block_size: int, overlap: int) -> np.ndarray:
"""
Divide an audio signal into overlapping frames.
Divides an audio signal into frames.
Currently does not support overlapping.

Parameters
----------
Expand All @@ -24,26 +25,33 @@ def framing(audio_info: AudioInfo, block_size: int, overlap: int):
overlap : int
The overlapping rate.

Returns
-------
blocks : numpy array
Framed signal.

"""
step = round((100 - overlap) / 100 * block_size)
audio_info.blocks = []
# step = round((100 - overlap) / 100.0 * block_size)
step = block_size
length = audio_info.signal.size
for i in range(0, length - block_size, step):
audio_info.blocks.append(audio_info.signal[i : i + block_size])
# Performs zero-padding on the last block if necessary
if length % block_size != 0:
remaining_samples = length % block_size
nblocks = length // step + 1
blocks = np.zeros((nblocks, block_size))
for i in range(nblocks-1):
blocks[i] = audio_info.signal[i*step : i*step + block_size]
if length % step != 0:
remaining_samples = length % step
last_block = np.pad(
audio_info.signal[-remaining_samples:],
(0, block_size - remaining_samples),
mode="constant",
)
audio_info.blocks.append(last_block)
blocks[-1] = last_block
return blocks

@staticmethod
def windowing(audio_info: AudioInfo, window_type: str):
def windowing(blocks: np.ndarray, window_type: str) -> np.ndarray:
"""
Apply a window function to each frame.
Applies a window function to each frame.
Currently supports window types available in scipy's signal module.

Parameters
Expand All @@ -53,13 +61,18 @@ def windowing(audio_info: AudioInfo, window_type: str):
window_type : str
The window type.

Returns
-------
windowed_blocks : numpy array
Windowed signal frames.

"""
w = get_window(window=window_type, Nx=len(audio_info.blocks[0]))
for block in audio_info.blocks:
audio_info.windowed_blocks.append(block * w)
w = get_window(window=window_type, Nx=len(blocks[0]))
windowed_blocks = np.multiply(blocks[:], w)
return windowed_blocks

@staticmethod
def fft(audio_info: AudioInfo):
def fft(windowed_blocks: np.ndarray) -> np.ndarray:
"""
Compute the Fast Fourier Transform (FFT) for each frame.

Expand All @@ -68,6 +81,12 @@ def fft(audio_info: AudioInfo):
audio_info : AudioInfo
The input audio_info object.

Returns
-------
dft_blocks : numpy array
FFT blocks.

"""
for block in audio_info.windowed_blocks:
audio_info.dft_blocks.append(scifft(block))
dft_blocks = np.apply_along_axis(scifft, 1, windowed_blocks)
return dft_blocks

166 changes: 166 additions & 0 deletions plap/parameterization/cepstral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from plap.core.audio_info import AudioInfo
from plap.core.preprocessing import Preprocessing
from plap.parameterization.filterbank import Filterbank
import numpy as np


class Cepstral:
"""
Provides a comprehensive pipeline for the extraction
of selected cepstral coefficients.
Currently supports MFCCs.

"""

def __init__(
self,
audio_info: AudioInfo,
block_size: int,
window_type: str,
overlap: int,
filterbank_name: str,
ncoeffs: int,
):
"""
Cepstral class instances are created to hold the audio signal
as well as various parameters for preprocessing, filter bank
creation and coefficient extraction.

"""
self._audio_info = audio_info
self._block_size = block_size
self._window_type = window_type
self._overlap = overlap
self._step = round((100 - overlap) / 100 * block_size)
self._filterbank_name = filterbank_name
self._ncoeffs = ncoeffs

@staticmethod
def mfcc(
audio_info: AudioInfo,
ncoeffs: int,
nbands: int,
block_size: int,
window_type: str,
overlap: int,
) -> np.ndarray:
"""
Calculates MFCCs for a given audio signal.

Parameters
----------
audio_info : AudioInfo
The input audio_info object.
ncoeffs : int
Number of coefficients to be calculated.
nbands : int
Number of mel bands.
block_size : int
The size of each frame.
window_type : str
Name of the window type.
overlap : int
The overlapping rate.

Returns
-------
Numpy array with the desired number of Mel-Frequency Cepstral Coefficients

"""
MFCCExtractor = Cepstral(
audio_info=audio_info,
block_size=block_size,
window_type=window_type,
overlap=overlap,
filterbank_name="mel",
ncoeffs=ncoeffs,
)
# Perform necessary preprocessing
dft_blocks = MFCCExtractor.__preprocess()

# Create mel filterbank
mel_filterbank_params = [audio_info.sample_rate, block_size, nbands]
mel_filterbank = MFCCExtractor.__create_filterbank(params=mel_filterbank_params)

# Filter each frame and sum the energy
step = MFCCExtractor._step
nblocks = (audio_info.signal.size - block_size) // step + 1
x = np.zeros((nbands, nblocks))
for b in range(0, nblocks):
for i in range(0, nbands):
acc = 0
for k in range(0, block_size // 2 + 1):
acc += abs(dft_blocks[b][k]) * mel_filterbank[i][k]
x[i][b] = acc

# Apply log to each coefficient (mel filtered energy sum) for each frame
xl = MFCCExtractor.__apply_log(x)

# Get desired num of mfcc coefficients for each frame
# by applying dct to log mel filtered energy sums
mfccs = np.zeros((ncoeffs, nblocks))
for b in range(0, nblocks):
for j in range(0, ncoeffs):
acc = 0
for i in range(0, nbands):
acc += xl[i][b] * np.cos(j * (i - 0.5) * np.pi / nbands)
mfccs[j][b] = acc
return mfccs

def __preprocess(self) -> np.ndarray:
"""
Performs necessary preprocessing on signal.

Parameters
----------
?

Returns
-------
?

"""
blocks = Preprocessing.framing(
audio_info=self._audio_info,
block_size=self._block_size,
overlap=self._overlap,
)
windowed_blocks = Preprocessing.windowing(
blocks=blocks, window_type=self._window_type
)
dft_blocks = Preprocessing.fft(windowed_blocks=windowed_blocks)
return dft_blocks

def __create_filterbank(self, params: list) -> np.ndarray:
natalianelke marked this conversation as resolved.
Show resolved Hide resolved
"""
Creates a filter bank.

Parameters
----------
?

Returns
-------
?

"""
# Idea: takes in a list of params. self contains filter name, so an appropriate
# function from filterbanks module is called and params is passed. Each of those functions
# in that module knows how params is structured for them. For example, for mfcc params contains
# sample_rate, block_size and nmel_bands (number of mel bands).
return Filterbank(name=self._filterbank_name, params=params)

# def __apply_filterbank(self, filterbank) -> np.ndarray:
# # Applying each type of filterbanks can be different
# # so it has to be implemented in respective functions
# # here or maybe in filterbank.py
# pass

@staticmethod
def __apply_log(arr: np.ndarray) -> np.ndarray:
# Handle zeros entering log function
arr = np.where(arr == 0, arr + 1e-9, arr)
return np.log10(arr)

# def __apply_dct(self):
# pass
63 changes: 63 additions & 0 deletions plap/parameterization/filterbank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import numpy as np
from scipy.signal.windows import triang


class Filterbank:
"""
Provides ... TODO

"""

def __new__(self, name: str, params: list):
return {
"mel": self.__mel_filterbank(params=params),
"gammatone": self.__gammatone_filterbank(params=params),
}[name]

@staticmethod
def __mel_filterbank(params: list) -> np.ndarray:
"""
smth TODO

Parameters
----------
params : list
sample_rate, block_size, nmel_bands

"""
# asserts TODO
sample_rate = params[0]
block_size = params[1]
nmel_bands = params[2]

# Convert the highest frequency to mel
max_freq_hz = sample_rate / 2
max_freq_mel = 2595 * np.log10(1 + max_freq_hz / 700)

# Create mel_bands equally spaced points (centres of mel bands)
# mel_centres includes both the centre points as well
# as the lowest and highest frequency
mel_centres = np.linspace(0, max_freq_mel, nmel_bands + 2)

# Convert these points back to Hz
hz_centres = np.round(700 * (10 ** (mel_centres / 2595) - 1))

# Find indices of the nearest frequency bins
freqs = np.linspace(0, sample_rate / 2, block_size // 2 + 1)
hz_centres_indices = np.zeros_like(hz_centres, dtype=int)
for i, hz_val in enumerate(hz_centres):
closest = np.argmin(np.abs(freqs - hz_val))
hz_centres_indices[i] = closest

# Create mel filter bank
mel_filterbank = np.zeros((nmel_bands, block_size // 2 + 1))
for i in range(0, nmel_bands):
low = hz_centres_indices[i]
high = hz_centres_indices[i + 2]
mel_filterbank[i][low:high] = triang(high - low)

return mel_filterbank

@staticmethod
def __gammatone_filterbank(params: list) -> np.ndarray:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w weekend coś dodam na pewno 😃

return np.zeros(()) # TODO
Loading