Shazam音乐检索算法原理及实现
  98g5RqHTVrNQ 2023年11月02日 52 0


Shazam音乐检索算法原理及实现_音乐识别

 

算法基本流程如下: 

1. 采集音乐库

Shazam音乐检索算法原理及实现_音乐识别_02

 2. 音乐指纹采集

Shazam音乐检索算法原理及实现_Shazam算法_03

3. 采用局部最大值作为特征点

Shazam音乐检索算法原理及实现_音乐检索_04

 4. 将临近的特征点进行组合形成特征点对

Shazam音乐检索算法原理及实现_Shazam算法_05

5. 对每个特征点对进行hash编码

Shazam音乐检索算法原理及实现_Shazam算法_06

编码过程:将f1和f2进行10bit量化,其余bit用来存储时间偏移合集形成32bit的hash码

Hash = f1|f2<<10|diff_t<<20,存储信息(t1,Hash)

Shazam音乐检索算法原理及实现_特征点_07

实现:

import numpy as np
import librosa
from scipy import signal
import pickle
import os
fix_rate = 16000 
win_length_seconds = 0.5
frequency_bits = 10
num_peaks = 15

# 构造歌曲名与歌曲id之间的映射字典
def song_collect(base_path):
    index = 0
    dic_idx2song = {}
    for roots, dirs, files in os.walk(base_path):
        for file in files:
            if file.endswith(('.mp3', '.wav')):
                file_song = os.path.join(roots, file)
                dic_idx2song[index] = file_song
                index += 1
    return dic_idx2song

# 提取局部最大特征点
def collect_map(y, fs, win_length_seconds=0.5, num_peaks=15):
    win_length = int(win_length_seconds * fs)
    hop_length = int(win_length // 2)
    S = librosa.stft(y, n_fft=win_length, win_length=win_length, hop_length=hop_length)
    S = np.abs(S) # 获取频谱图
    D, T = np.shape(S)
    
    constellation_map = []  
    for i in range(T):
        spectrum = S[:, i]
        peaks_index, props = signal.find_peaks(spectrum, prominence=0, distance=200)
        # 根据显著性进行排序
        n_peaks= min(num_peaks, len(peaks_index))
        largest_peaks_index = np.argpartition(props['prominences'], -n_peaks)[-n_peaks:]
        for peak_index in peaks_index[largest_peaks_index]:
            frequency = fs / win_length * peak_index
            # 保存局部最大值点的时-频信息
            constellation_map.append([i, frequency])
    return constellation_map
        

# 进行Hash编码
def create_hash(constellation_map, fs, frequency_bits=10, song_id=None):
    upper_frequency = fs / 2
    hashes = {}
    for idx, (time, freq) in enumerate(constellation_map):
        for other_time, other_freq in constellation_map[idx: idx + 100]: # 从邻近的100个点中找点对
            diff = int(other_time - time)
            if diff <= 1 or diff > 10: # 在一定时间范围内找点对
                continue
            freq_binned = int(freq / upper_frequency * (2 ** frequency_bits))
            other_freq_binned = int(other_freq / upper_frequency * (2 ** frequency_bits))
            hash = int(freq_binned) | (int(other_freq) << 10) | (int(diff) << 20)
            hashes[hash] = (time, song_id)
    return hashes

特征提取:feature_collect.py

# 获取数据库中所有音乐
path_music = 'data'
current_path = os.getcwd()
path_songs = os.path.join(current_path, path_music)
dic_idx2song = song_collect(path_songs)

# 对每条音乐进行特征提取
database = {}
for song_id in dic_idx2song.keys():
    file = dic_idx2song[song_id]
    print("collect info of file", file)
    
    # 读取音乐
    y, fs = librosa.load(file, sr=fix_rate)
    
    # 提取特征对
    constellation_map = collect_map(y, fs, win_length_seconds=win_length_seconds, num_peaks=num_peaks)
    
    # 获取hash值
    hashes = create_hash(constellation_map, fs, frequency_bits=frequency_bits, song_id=song_id)
    
    # 把hash信息填充入数据库
    for hash, time_index_pair in hashes.items():
        if hash not in database:
            database[hash] = []
        database[hash].append(time_index_pair)
        
# 对数据进行保存
with open('database.pickle', 'wb') as db:
    pickle.dump(database, db, pickle.HIGHEST_PROTOCOL)
with open('song_index.pickle', 'wb') as songs:
    pickle.dump(dic_idx2song, songs, pickle.HIGHEST_PROTOCOL)
# 加载数据库
database = pickle.load(open('database.pickle', 'rb'))
dic_idx2song = pickle.load(open('song_index.pickle', 'rb'))
print(len(database))

# 检索过程
def getscores(y, fs, database):
    # 对检索语音提取hash
    constellation_map = collect_map(y, fs)
    hashes = create_hash(constellation_map, fs, frequency_bits=10, song_id=None)
    
    # 获取与数据库中每首歌的hash匹配
    matches_per_song = {}
    for hash, (sample_time, _) in hashes.items():
        if hash in database:
            maching_occurences = database[hash]
            for source_time, song_index in maching_occurences:
                if song_index not in matches_per_song:
                    matches_per_song[song_index] = []
                matches_per_song[song_index].append((hash, sample_time, source_time))
    scores = {}
    # 对于匹配的hash,计算测试样本时间和数据库中样本时间的偏差
    for song_index, matches in matches_per_song.items():
#         scores[song_index] = len(matches)
        song_scores_by_offset = {}
        # 对相同的时间偏差进行累计
        for hash, sample_time, source_time in matches:
            delta = source_time - sample_time
            if delta not in song_scores_by_offset:
                song_scores_by_offset[delta] = 0
            song_scores_by_offset[delta] += 1
            
        # 计算每条歌曲的最大累计偏差
        max = (0, 0)
        for offset, score in song_scores_by_offset.items():
            if score > max[1]:
                max = (offset, score)
        scores[song_index] = max
    scores = sorted(scores.items(), key=lambda x:x[1][1], reverse=True)
    return scores

音乐检索:music_research.py

import threading
from playsound import playsound

def cycle(path):
    while 1:
        playsound(path)
def play(path, cyc=False):
    if cyc:
        cycle(path)
    else:
        playsound(path)

path = 'test_music/record4.wav'
y, fs = librosa.load(path, sr=fix_rate)
# 播放待检索音频
music = threading.Thread(target=play, args=(path,))
music.start()

# 检索打分
scores = getscores(y, fs, database)

# 打印检索信息
for k, v in scores:
    file = dic_idx2song[k]
    name = os.path.split(file)[-1]
    # print("%s :%d"%(name, v))
    print("%s: %d: %d"%(name, v[0], v[1]))
   
# 打印结果
if len(scores) > 0 and scores[0][1][1] > 50:
    print("检索结果为:", os.path.split(dic_idx2song[scores[0][0]])[-1])
else:
    print("没有搜索到该音乐")

麦克风录音识别音乐:

import pyaudio
import wave

RATE = 48000 # 采样率
CHUNK = 1024 # 帧大小
record_seconds = 10 # 录音时长s
CHANNWLS = 2 # 通道数

# 创建pyaudio流
audio = pyaudio.PyAudio()

stream = audio.open(format=pyaudio.paInt16, # 使用量化位数16位
                   channels=CHANNWLS, # 输入声道数目
                   rate=RATE, # 采样率
                   input=True, # 打开输入流
                   frames_per_buffer=CHUNK) # 缓冲区大小

frames = [] # 存放录制的数据
# 开始录音
print('录音中。。。')
for i in range(0, int(RATE / CHUNK * record_seconds)):
    # 从麦克风读取数据流
    data = stream.read(CHUNK)
    # 将数据追加到列表中
    frames.append(data)

# 停止录音,关闭输入流
stream.stop_stream()
stream.close()
audio.terminate()

# 将录音数据写入wav文件中
with wave.open('test_music/test.wav', 'wb') as wf:
    wf.setnchannels(CHANNWLS)
    wf.setsampwidth(audio.get_sample_size(pyaudio.paInt16))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
    
# 打开录音文件
path = 'test_music/test.wav'
y, fs = librosa.load(path, sr=fix_rate)

# 线程播放待检索音频
# music = threading.Thread(target=play, args=(path,))
# music.start()

# 音乐检索
print('检索中。。。')
scores = getscores(y, fix_rate, database)

# 打印检索信息
# for k, v in scores:
#     file = dic_idx2song[k]
#     name = os.path.split(file)[-1]
#     # print("%s :%d"%(name, v))
#     print("%s: %d: %d"%(name, v[0], v[1]))

   
# 打印结果
if len(scores) > 0 and scores[0][1][1] > 50:
    print("检索结果为:", os.path.split(dic_idx2song[scores[0][0]])[-1])
else:
    print("没有搜索到该音乐")

Shazam音乐检索算法原理及实现_Shazam算法_08

参考:音乐检索-Shazam算法原理_哔哩哔哩_bilibili

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读