-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrpm.py
127 lines (111 loc) · 5.52 KB
/
rpm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import argparse
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
from scipy.io import wavfile
import os
def mix_down_to_mono(audio_data, channel_mode='mix'):
"""Mix down stereo audio data to mono based on the selected channel mode."""
if channel_mode == 'left':
return audio_data[:, 0] # Left channel only
elif channel_mode == 'right':
return audio_data[:, 1] # Right channel only
elif channel_mode == 'mix':
return np.mean(audio_data, axis=1) # Mix both channels
else:
raise ValueError("Invalid channel mode. Choose 'left', 'right', or 'mix'.")
def find_highest_peaks(audio_mono, sr, num_peaks, min_distance_samples):
"""Find the specified number of highest peaks in the mono audio signal, considering a minimum distance between peaks."""
peaks, properties = find_peaks(audio_mono, height=0, distance=min_distance_samples)
if len(peaks) == 0:
print("No peaks found.")
return [], []
highest_peaks_indices = np.argsort(properties['peak_heights'])[-num_peaks:][::-1]
highest_peaks = [(peaks[i], properties['peak_heights'][i]) for i in highest_peaks_indices]
highest_peaks_sorted = sorted(highest_peaks, key=lambda x: x[0])
peak_times = np.array([peak[0] for peak in highest_peaks_sorted]) / sr
peak_heights = np.array([peak[1] for peak in highest_peaks_sorted])
return peak_times, peak_heights
def calculate_intervals(peak_times):
"""Calculate time intervals between peaks."""
return np.diff(peak_times)
def calculate_intervals_stats(intervals):
"""Calculate statistics for time intervals and convert them to RPM values."""
if intervals.size == 0:
return None
rpm_values = 60 / intervals
stats = {
'Min': (np.min(intervals), np.min(rpm_values)),
'Max': (np.max(intervals), np.max(rpm_values)),
'Average': (np.mean(intervals), np.mean(rpm_values)),
'Std Dev': (np.std(intervals), np.std(rpm_values))
}
return stats
def print_stats(stats, channel_mode, num_peaks):
"""Print statistics for the analysis."""
max_decimal_places = max(max(len(str(value).split('.')[1]) if '.' in str(value) else 0 for value in pair) for pair in stats.values())
print()
print("Channel mode: " + channel_mode)
print(f"Revolutions: {num_peaks - 1}")
print()
print(f"{'Statistic':<20} {'Time Intervals (s)':<35} {'RPM Values':<35}")
print("-" * 90)
for stat, (interval, rpm) in stats.items():
interval_str = f"{interval:.{max_decimal_places}f}"
rpm_str = f"{rpm:.{max_decimal_places}f}"
print(f"{stat:<20} {interval_str:<35} {rpm_str:<35}")
def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Turntable RPM Analysis")
parser.add_argument("filename", type=str, help="Path to the audio file")
parser.add_argument("num_peaks", type=int, help="Number of highest peaks to find")
parser.add_argument("min_distance_ms", type=int, help="Minimum distance between peaks in milliseconds")
parser.add_argument("--channel_mode", type=str, choices=['left', 'right', 'mix'], default='mix', help="Channel mode for analysis")
parser.add_argument("--plot", action="store_true", help="Plot the RPM data per revolution")
parser.add_argument("--plot_title", type=str, default="RPM Analysis Plot", help="Title of the plot")
return parser.parse_args()
def plot_out(intervals, title, output_file_path):
fig, ax = plt.subplots(1, 1, figsize=(16,8))
t = np.arange(0, len(intervals))
ax.plot(t, intervals)
ax2 = ax.twinx()
mn, mx = ax.get_ylim()
ax2.set_ylim(60/mn, 60/mx)
ax.set_ylabel("Time (s)")
ax2.set_ylabel("RPM")
ax.set_xlabel("Revolutions")
# Add an alternating white/light grey background for each revolution
for i in range(len(intervals)):
if i % 2 == 0:
ax.axvspan(i - 0.5, i + 0.5, facecolor='gainsboro', alpha=0.5)
ax.grid(True, which="major", axis="both", ls="-", color="gainsboro")
ax.grid(True, which="minor", axis="both", ls="-", color="gainsboro")
if title:
ax.set_title(title + "\n", fontsize=16)
plt.savefig(output_file_path)
# Show the plot if DISPLAY is set or if a non-headless backend is in use
if os.environ.get('DISPLAY') or 'agg' not in plt.get_backend().lower():
plt.show()
def main():
args = parse_arguments()
sr, audio = wavfile.read(args.filename) # Load the audio file
audio_mono = mix_down_to_mono(audio, args.channel_mode) # Process audio based on channel mode
min_distance_samples = int((args.min_distance_ms / 1000.0) * sr)
peak_times, peak_heights = find_highest_peaks(audio_mono, sr, args.num_peaks, min_distance_samples)
if len(peak_times) == args.num_peaks:
intervals = calculate_intervals(peak_times)
stats = calculate_intervals_stats(intervals)
if stats:
print_stats(stats, args.channel_mode, args.num_peaks)
else:
print("Could not calculate statistics.")
else:
print(f"Expected {args.num_peaks} peaks, but found {len(peak_times)}. Unable to calculate statistics.")
if args.plot:
output_dir = os.path.dirname(args.filename)
output_file_name = os.path.splitext(os.path.basename(args.filename))[0] + "_RPM_Analysis_Plot.png"
output_file_path = os.path.join(output_dir, output_file_name)
plot_out(intervals, args.plot_title, output_file_path)
print(f"Plot saved to {output_file_path}")
if __name__ == "__main__":
main()