Add a bunch of test files
This commit is contained in:
90
stream.py
Normal file
90
stream.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import os
|
||||
|
||||
DEBUG = os.environ.get('DEBUG')
|
||||
|
||||
import logging
|
||||
logging.basicConfig(
|
||||
format='[%(asctime)s] %(levelname)s %(module)s/%(funcName)s: - %(message)s',
|
||||
level=logging.DEBUG if DEBUG else logging.INFO)
|
||||
|
||||
import pymumble_py3 as pymumble_py3
|
||||
from pymumble_py3.callbacks import PYMUMBLE_CLBK_SOUNDRECEIVED as PCS
|
||||
import whisper
|
||||
from copy import copy
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
logging.info('Loading whisper model...')
|
||||
model = whisper.load_model('medium')
|
||||
logging.info('Done.')
|
||||
|
||||
# Connection details for mumble server. Hardcoded for now, will have to be
|
||||
# command line arguments eventually
|
||||
pwd = "" # password
|
||||
server = "protospace.ca" # server address
|
||||
nick = "python"
|
||||
port = 64738 # port number
|
||||
|
||||
CHUNK_LENGTH = 24000 # 48000 Hz * 0.5 s
|
||||
|
||||
# array of 0.5 sec audio chunks
|
||||
audio_chunks = [bytearray()]
|
||||
|
||||
def sound_received_handler(user, soundchunk):
|
||||
# pymumble PCM is 16-bit 48000 Hz
|
||||
|
||||
if len(audio_chunks[-1]) < CHUNK_LENGTH:
|
||||
audio_chunks[-1].extend(soundchunk.pcm)
|
||||
else:
|
||||
audio_chunks.append(bytearray())
|
||||
|
||||
if len(audio_chunks) > 10:
|
||||
audio_chunks.pop(0)
|
||||
|
||||
|
||||
# Spin up a client and connect to mumble server
|
||||
mumble = pymumble_py3.Mumble(server, nick, password=pwd, port=port)
|
||||
# set up callback called when PCS event occurs
|
||||
mumble.callbacks.set_callback(PCS, sound_received_handler)
|
||||
mumble.set_receive_sound(1) # Enable receiving sound from mumble server
|
||||
mumble.start()
|
||||
mumble.is_ready() # Wait for client is ready
|
||||
|
||||
# constant capturing sound and sending it to mumble server
|
||||
while True:
|
||||
#data = stream.read(CHUNK, exception_on_overflow=False)
|
||||
#mumble.sound_output.add_sound(data)
|
||||
|
||||
if len(audio_chunks) != 10:
|
||||
continue
|
||||
|
||||
start = time.time()
|
||||
a = copy(audio_chunks)
|
||||
b = b''.join(a)
|
||||
c = np.frombuffer(b, np.int16)
|
||||
|
||||
# Define a low-pass filter kernel
|
||||
fs = 48000
|
||||
cutoff_freq = fs / 6
|
||||
nyquist_freq = fs / 2
|
||||
num_taps = 101
|
||||
taps = np.sinc(2 * cutoff_freq / fs * (np.arange(num_taps) - (num_taps - 1) / 2))
|
||||
taps *= np.blackman(num_taps)
|
||||
taps /= np.sum(taps)
|
||||
|
||||
# Apply the filter kernel to audio_data using convolution
|
||||
filtered_audio_data = np.convolve(c, taps, mode='same')
|
||||
# Downsample filtered_audio_data by a factor of 3 using take
|
||||
downsampled_audio_data = filtered_audio_data.take(np.arange(0, len(filtered_audio_data), 3))
|
||||
downsampled_audio_data = downsampled_audio_data.flatten().astype(np.float32) / 32768.0
|
||||
|
||||
d = whisper.pad_or_trim(downsampled_audio_data)
|
||||
|
||||
#print('processed audio in', time.time() - start, 's')
|
||||
|
||||
e = model.transcribe(d)
|
||||
|
||||
print(e['text'])
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user