Spaces:
Runtime error
Runtime error
from flask import Flask, request, send_file, jsonify | |
import subprocess | |
import numpy as np | |
import ffmpeg | |
import whisper | |
import re | |
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
from io import BytesIO | |
import torch | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfgen import canvas | |
import textwrap | |
import os | |
app = Flask(__name__) | |
# ------------------------------- | |
# Global setup | |
# ------------------------------- | |
model_name = "Qwen/Qwen2.5-7B" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_name, | |
device_map="auto", | |
torch_dtype=torch.float16 | |
) | |
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
# ------------------------------- | |
# Pipeline functions | |
# ------------------------------- | |
def stream_youtube_audio(video_url): | |
command = [ | |
"yt-dlp", | |
"-f", "bestaudio", | |
"--no-playlist", | |
"-o", "-", | |
video_url | |
] | |
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
return process.stdout.read() | |
def audio_stream_to_numpy(audio_bytes): | |
try: | |
out, _ = ( | |
ffmpeg.input("pipe:0") | |
.output("pipe:1", format="wav", acodec="pcm_s16le", ac=1, ar="16000") | |
.run(input=audio_bytes, capture_stdout=True, capture_stderr=True) | |
) | |
audio_data = np.frombuffer(out, np.int16).astype(np.float32) / 32768.0 | |
return audio_data | |
except ffmpeg.Error as e: | |
print("FFmpeg error:", e) | |
return None | |
def transcribe_audio_numpy(audio_data): | |
model_whisper = whisper.load_model("tiny") | |
result = model_whisper.transcribe(audio_data) | |
print("Transcription completed.") | |
return result["text"] | |
def summarize_text(transcription, max_tokens=512): | |
if len(transcription.split()) < 100: | |
return transcription | |
summary = summarizer(transcription, max_length=max_tokens, min_length=100, do_sample=False) | |
return summary[0]['summary_text'] | |
def generate_questionnaire(summary): | |
prompt = f""" | |
You are a professional questionnaire generator reputed for generating diverse questionnaires, given any | |
information sample. | |
The questionnaire you generate must contain: | |
1. Three simple multiple-choice questions (each with 4 options). | |
2. One moderately difficult multiple-choice question (4 options). | |
3. Two simple open-ended questions. | |
4. Three moderately difficult open-ended questions. | |
5. One hard scenario-based open-ended question. | |
Make sure to cover each and every type of question mentioned. | |
Nothing else, no code. Stick strictly to the provided context. | |
Also, provide the questions in a structured, well-formatted, sequential manner. | |
Start question sections with ### Multiple-Choice Questions etc. | |
Generate a well-structured questionnaire based on the following content: | |
"{summary}" | |
""" | |
inputs = tokenizer(prompt, return_tensors="pt").to(model.device) | |
outputs = model.generate( | |
**inputs, | |
max_length=2000, | |
temperature=0.2, | |
top_p=0.8, | |
repetition_penalty=1.1, | |
do_sample=True | |
) | |
output_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
print("Questionnaire generation completed.") | |
return clean_questionnaire(output_text) | |
def clean_questionnaire(raw_text): | |
match = re.search(r"(### Multiple-Choice Questions.*?)$", raw_text, re.DOTALL) | |
cleaned_text = match.group(1) if match else raw_text | |
return cleaned_text.strip() | |
def save_text_as_pdf(text, filename): | |
c = canvas.Canvas(filename, pagesize=letter) | |
width, height = letter | |
margin = 50 | |
text_object = c.beginText(margin, height - margin) | |
text_object.setFont("Helvetica", 12) | |
max_chars_per_line = 100 | |
for paragraph in text.split("\n"): | |
wrapped_lines = textwrap.wrap(paragraph, width=max_chars_per_line) | |
if not wrapped_lines: | |
text_object.textLine("") | |
for line in wrapped_lines: | |
text_object.textLine(line) | |
if text_object.getY() < margin: | |
c.drawText(text_object) | |
c.showPage() | |
text_object = c.beginText(margin, height - margin) | |
text_object.setFont("Helvetica", 12) | |
c.drawText(text_object) | |
c.save() | |
def process_stream(video_url, output_pdf="questionnaire.pdf"): | |
print("Streaming audio...") | |
audio_bytes = stream_youtube_audio(video_url) | |
if not audio_bytes: | |
print("Error: Unable to fetch audio.") | |
return None | |
print("Converting audio stream to NumPy array...") | |
audio_data = audio_stream_to_numpy(audio_bytes) | |
if audio_data is None: | |
print("Error: Unable to process audio data.") | |
return None | |
print("Transcribing audio...") | |
transcription = transcribe_audio_numpy(audio_data) | |
if not transcription: | |
print("Error: Transcription failed.") | |
return None | |
print("Summarizing transcription...") | |
summary = summarize_text(transcription) | |
print("Generating questionnaire...") | |
questionnaire = generate_questionnaire(summary) | |
print("Converting questionnaire to PDF...") | |
save_text_as_pdf(questionnaire, output_pdf) | |
print(f"PDF generated: {output_pdf}") | |
return output_pdf | |
# ------------------------------- | |
# API endpoints | |
# ------------------------------- | |
def process_video(): | |
data = request.get_json() | |
video_url = data.get("video_url") | |
if not video_url: | |
return jsonify({"error": "No video URL provided."}), 400 | |
pdf_file = process_stream(video_url) | |
if not pdf_file: | |
return jsonify({"error": "Processing failed. Check logs for details."}), 500 | |
return send_file(pdf_file, as_attachment=True) | |
if __name__ == '__main__': | |
# When deploying on a cloud service, make sure the port is set appropriately. | |
app.run(debug=True, use_reloader=False, host="0.0.0.0", port=5000) | |