You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
142 lines
5.3 KiB
142 lines
5.3 KiB
#! python3.7 |
|
|
|
import argparse |
|
import io |
|
import os |
|
import speech_recognition as sr |
|
import whisper |
|
import torch |
|
|
|
from datetime import datetime, timedelta |
|
from queue import Queue |
|
from tempfile import NamedTemporaryFile |
|
from time import sleep |
|
from sys import platform |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser() |
|
if 'linux' in platform: |
|
parser.add_argument("--default_microphone", default='pulse', |
|
help="Default microphone name for SpeechRecognition. " |
|
"Run this with 'list' to view available Microphones.", type=str) |
|
args = parser.parse_args() |
|
|
|
# The last time a recording was retreived from the queue. |
|
phrase_time = None |
|
# Current raw audio bytes. |
|
last_sample = bytes() |
|
# Thread safe Queue for passing data from the threaded recording callback. |
|
data_queue = Queue() |
|
# We use SpeechRecognizer to record our audio because it has a nice feauture where it can detect when speech ends. |
|
recorder = sr.Recognizer() |
|
recorder.energy_threshold = 1000 |
|
# Definitely do this, dynamic energy compensation lowers the energy threshold dramtically to a point where the SpeechRecognizer never stops recording. |
|
recorder.dynamic_energy_threshold = False |
|
|
|
# Important for linux users. |
|
# Prevents permanent application hang and crash by using the wrong Microphone |
|
if 'linux' in platform: |
|
mic_name = args.default_microphone |
|
if not mic_name or mic_name == 'list': |
|
print("Available microphone devices are: ") |
|
for index, name in enumerate(sr.Microphone.list_microphone_names()): |
|
print(f"Microphone with name \"{name}\" found") |
|
return |
|
else: |
|
for index, name in enumerate(sr.Microphone.list_microphone_names()): |
|
if mic_name in name: |
|
source = sr.Microphone(sample_rate=16000, device_index=index) |
|
break |
|
else: |
|
source = sr.Microphone(sample_rate=16000) |
|
|
|
# Load / Download model |
|
model = 'medium' |
|
non_english = False |
|
if args.model != "large" and not non_english: |
|
model = model + ".en" |
|
audio_model = whisper.load_model(model) |
|
|
|
record_timeout = 2 # seconds |
|
phrase_timeout = 3 # seconds between new lines |
|
|
|
temp_file = NamedTemporaryFile().name |
|
transcription = [''] |
|
|
|
with source: |
|
recorder.adjust_for_ambient_noise(source) |
|
|
|
def record_callback(_, audio:sr.AudioData) -> None: |
|
""" |
|
Threaded callback function to recieve audio data when recordings finish. |
|
audio: An AudioData containing the recorded bytes. |
|
""" |
|
# Grab the raw bytes and push it into the thread safe queue. |
|
data = audio.get_raw_data() |
|
data_queue.put(data) |
|
|
|
# Create a background thread that will pass us raw audio bytes. |
|
# We could do this manually but SpeechRecognizer provides a nice helper. |
|
recorder.listen_in_background(source, record_callback, phrase_time_limit=record_timeout) |
|
|
|
# Cue the user that we're ready to go. |
|
print("Model loaded.\n") |
|
|
|
while True: |
|
try: |
|
now = datetime.utcnow() |
|
# Pull raw recorded audio from the queue. |
|
if not data_queue.empty(): |
|
phrase_complete = False |
|
# If enough time has passed between recordings, consider the phrase complete. |
|
# Clear the current working audio buffer to start over with the new data. |
|
if phrase_time and now - phrase_time > timedelta(seconds=phrase_timeout): |
|
last_sample = bytes() |
|
phrase_complete = True |
|
# This is the last time we received new audio data from the queue. |
|
phrase_time = now |
|
|
|
# Concatenate our current audio data with the latest audio data. |
|
while not data_queue.empty(): |
|
data = data_queue.get() |
|
last_sample += data |
|
|
|
# Use AudioData to convert the raw data to wav data. |
|
audio_data = sr.AudioData(last_sample, source.SAMPLE_RATE, source.SAMPLE_WIDTH) |
|
wav_data = io.BytesIO(audio_data.get_wav_data()) |
|
|
|
# Write wav data to the temporary file as bytes. |
|
with open(temp_file, 'w+b') as f: |
|
f.write(wav_data.read()) |
|
|
|
# Read the transcription. |
|
result = audio_model.transcribe(temp_file, fp16=torch.cuda.is_available()) |
|
text = result['text'].strip() |
|
|
|
# If we detected a pause between recordings, add a new item to our transcripion. |
|
# Otherwise edit the existing one. |
|
if phrase_complete: |
|
transcription.append(text) |
|
else: |
|
transcription[-1] = text |
|
|
|
# Clear the console to reprint the updated transcription. |
|
os.system('cls' if os.name=='nt' else 'clear') |
|
for line in transcription: |
|
print(line) |
|
# Flush stdout. |
|
print('', end='', flush=True) |
|
|
|
# Infinite loops are bad for processors, must sleep. |
|
sleep(0.25) |
|
except KeyboardInterrupt: |
|
break |
|
|
|
print("\n\nTranscription:") |
|
for line in transcription: |
|
print(line) |
|
|
|
|
|
if __name__ == "__main__": |
|
main()
|
|
|