-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbpm_detector.py
158 lines (124 loc) · 4.36 KB
/
bpm_detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import argparse
import array
import math
import wave
import matplotlib.pyplot as plt
import numpy
import pywt
from scipy import signal
def read_wav(filename):
# open file, get metadata for audio
try:
wf = wave.open(filename, "rb")
except IOError as e:
print(e)
return
# typ = choose_type( wf.getsampwidth() ) # TODO: implement choose_type
nsamps = wf.getnframes()
assert nsamps > 0
fs = wf.getframerate()
assert fs > 0
# Read entire file and make into an array
samps = list(array.array("i", wf.readframes(nsamps)))
try:
assert nsamps == len(samps)
except AssertionError:
print(nsamps, "not equal to", len(samps))
return samps, fs
# print an error when no data can be found
def no_audio_data():
print("No audio data for sample, skipping...")
return None, None
# simple peak detection
def peak_detect(data):
max_val = numpy.amax(abs(data))
peak_ndx = numpy.where(data == max_val)
if len(peak_ndx[0]) == 0: # if nothing found then the max must be negative
peak_ndx = numpy.where(data == -max_val)
return peak_ndx
def bpm_detector(data, fs):
cA = []
cD = []
correl = []
cD_sum = []
levels = 4
max_decimation = 2 ** (levels - 1)
min_ndx = math.floor(60.0 / 220 * (fs / max_decimation))
max_ndx = math.floor(60.0 / 40 * (fs / max_decimation))
for loop in range(0, levels):
cD = []
# 1) DWT
if loop == 0:
[cA, cD] = pywt.dwt(data, "db4")
cD_minlen = len(cD) / max_decimation + 1
cD_sum = numpy.zeros(math.floor(cD_minlen))
else:
[cA, cD] = pywt.dwt(cA, "db4")
# 2) Filter
cD = signal.lfilter([0.01], [1 - 0.99], cD)
# 4) Subtract out the mean.
# 5) Decimate for reconstruction later.
cD = abs(cD[:: (2 ** (levels - loop - 1))])
cD = cD - numpy.mean(cD)
# 6) Recombine the signal before ACF
# Essentially, each level the detail coefs (i.e. the HPF values) are concatenated to the beginning of the array
cD_sum = cD[0 : math.floor(cD_minlen)] + cD_sum
if [b for b in cA if b != 0.0] == []:
return no_audio_data()
# Adding in the approximate data as well...
cA = signal.lfilter([0.01], [1 - 0.99], cA)
cA = abs(cA)
cA = cA - numpy.mean(cA)
cD_sum = cA[0 : math.floor(cD_minlen)] + cD_sum
# ACF
correl = numpy.correlate(cD_sum, cD_sum, "full")
midpoint = math.floor(len(correl) / 2)
correl_midpoint_tmp = correl[midpoint:]
peak_ndx = peak_detect(correl_midpoint_tmp[min_ndx:max_ndx])
if len(peak_ndx) > 1:
return no_audio_data()
peak_ndx_adjusted = peak_ndx[0] + min_ndx
bpm = 60.0 / peak_ndx_adjusted * (fs / max_decimation)
print(bpm)
return bpm, correl
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process .wav file to determine the Beats Per Minute.")
parser.add_argument("--filename", required=True, help=".wav file for processing")
parser.add_argument(
"--window",
type=float,
default=3,
help="Size of the the window (seconds) that will be scanned to determine the bpm. Typically less than 10 seconds. [3]",
)
args = parser.parse_args()
samps, fs = read_wav(args.filename)
data = []
correl = []
bpm = 0
n = 0
nsamps = len(samps)
window_samps = int(args.window * fs)
samps_ndx = 0 # First sample in window_ndx
max_window_ndx = math.floor(nsamps / window_samps)
bpms = numpy.zeros(max_window_ndx)
# Iterate through all windows
for window_ndx in range(0, max_window_ndx):
# Get a new set of samples
# print(n,":",len(bpms),":",max_window_ndx_int,":",fs,":",nsamps,":",samps_ndx)
data = samps[samps_ndx : samps_ndx + window_samps]
if not ((len(data) % window_samps) == 0):
raise AssertionError(str(len(data)))
bpm, correl_temp = bpm_detector(data, fs)
if bpm is None:
continue
bpms[window_ndx] = bpm
correl = correl_temp
# Iterate at the end of the loop
samps_ndx = samps_ndx + window_samps
# Counter for debug...
n = n + 1
bpm = numpy.median(bpms)
print("Completed! Estimated Beats Per Minute:", bpm)
n = range(0, len(correl))
plt.plot(n, abs(correl))
plt.show(block=True)