Spaces:
Runtime error
Runtime error
File size: 5,949 Bytes
aeb5f89 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
from flask import Flask, request, send_file, jsonify
import subprocess
import numpy as np
import ffmpeg
import whisper
import re
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from io import BytesIO
import torch
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import textwrap
import os
app = Flask(__name__)
# -------------------------------
# Global setup
# -------------------------------
model_name = "Qwen/Qwen2.5-7B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
torch_dtype=torch.float16
)
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
# -------------------------------
# Pipeline functions
# -------------------------------
def stream_youtube_audio(video_url):
command = [
"yt-dlp",
"-f", "bestaudio",
"--no-playlist",
"-o", "-",
video_url
]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.stdout.read()
def audio_stream_to_numpy(audio_bytes):
try:
out, _ = (
ffmpeg.input("pipe:0")
.output("pipe:1", format="wav", acodec="pcm_s16le", ac=1, ar="16000")
.run(input=audio_bytes, capture_stdout=True, capture_stderr=True)
)
audio_data = np.frombuffer(out, np.int16).astype(np.float32) / 32768.0
return audio_data
except ffmpeg.Error as e:
print("FFmpeg error:", e)
return None
def transcribe_audio_numpy(audio_data):
model_whisper = whisper.load_model("tiny")
result = model_whisper.transcribe(audio_data)
print("Transcription completed.")
return result["text"]
def summarize_text(transcription, max_tokens=512):
if len(transcription.split()) < 100:
return transcription
summary = summarizer(transcription, max_length=max_tokens, min_length=100, do_sample=False)
return summary[0]['summary_text']
def generate_questionnaire(summary):
prompt = f"""
You are a professional questionnaire generator reputed for generating diverse questionnaires, given any
information sample.
The questionnaire you generate must contain:
1. Three simple multiple-choice questions (each with 4 options).
2. One moderately difficult multiple-choice question (4 options).
3. Two simple open-ended questions.
4. Three moderately difficult open-ended questions.
5. One hard scenario-based open-ended question.
Make sure to cover each and every type of question mentioned.
Nothing else, no code. Stick strictly to the provided context.
Also, provide the questions in a structured, well-formatted, sequential manner.
Start question sections with ### Multiple-Choice Questions etc.
Generate a well-structured questionnaire based on the following content:
"{summary}"
"""
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
outputs = model.generate(
**inputs,
max_length=2000,
temperature=0.2,
top_p=0.8,
repetition_penalty=1.1,
do_sample=True
)
output_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print("Questionnaire generation completed.")
return clean_questionnaire(output_text)
def clean_questionnaire(raw_text):
match = re.search(r"(### Multiple-Choice Questions.*?)$", raw_text, re.DOTALL)
cleaned_text = match.group(1) if match else raw_text
return cleaned_text.strip()
def save_text_as_pdf(text, filename):
c = canvas.Canvas(filename, pagesize=letter)
width, height = letter
margin = 50
text_object = c.beginText(margin, height - margin)
text_object.setFont("Helvetica", 12)
max_chars_per_line = 100
for paragraph in text.split("\n"):
wrapped_lines = textwrap.wrap(paragraph, width=max_chars_per_line)
if not wrapped_lines:
text_object.textLine("")
for line in wrapped_lines:
text_object.textLine(line)
if text_object.getY() < margin:
c.drawText(text_object)
c.showPage()
text_object = c.beginText(margin, height - margin)
text_object.setFont("Helvetica", 12)
c.drawText(text_object)
c.save()
def process_stream(video_url, output_pdf="questionnaire.pdf"):
print("Streaming audio...")
audio_bytes = stream_youtube_audio(video_url)
if not audio_bytes:
print("Error: Unable to fetch audio.")
return None
print("Converting audio stream to NumPy array...")
audio_data = audio_stream_to_numpy(audio_bytes)
if audio_data is None:
print("Error: Unable to process audio data.")
return None
print("Transcribing audio...")
transcription = transcribe_audio_numpy(audio_data)
if not transcription:
print("Error: Transcription failed.")
return None
print("Summarizing transcription...")
summary = summarize_text(transcription)
print("Generating questionnaire...")
questionnaire = generate_questionnaire(summary)
print("Converting questionnaire to PDF...")
save_text_as_pdf(questionnaire, output_pdf)
print(f"PDF generated: {output_pdf}")
return output_pdf
# -------------------------------
# API endpoints
# -------------------------------
@app.route('/process', methods=['POST'])
def process_video():
data = request.get_json()
video_url = data.get("video_url")
if not video_url:
return jsonify({"error": "No video URL provided."}), 400
pdf_file = process_stream(video_url)
if not pdf_file:
return jsonify({"error": "Processing failed. Check logs for details."}), 500
return send_file(pdf_file, as_attachment=True)
if __name__ == '__main__':
# When deploying on a cloud service, make sure the port is set appropriately.
app.run(debug=True, use_reloader=False, host="0.0.0.0", port=5000)
|