Jai Ansh Bindra
Add Flask app and requirements.
aeb5f89
from flask import Flask, request, send_file, jsonify
import subprocess
import numpy as np
import ffmpeg
import whisper
import re
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from io import BytesIO
import torch
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import textwrap
import os
app = Flask(__name__)
# -------------------------------
# Global setup
# -------------------------------
model_name = "Qwen/Qwen2.5-7B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
torch_dtype=torch.float16
)
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
# -------------------------------
# Pipeline functions
# -------------------------------
def stream_youtube_audio(video_url):
command = [
"yt-dlp",
"-f", "bestaudio",
"--no-playlist",
"-o", "-",
video_url
]
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.stdout.read()
def audio_stream_to_numpy(audio_bytes):
try:
out, _ = (
ffmpeg.input("pipe:0")
.output("pipe:1", format="wav", acodec="pcm_s16le", ac=1, ar="16000")
.run(input=audio_bytes, capture_stdout=True, capture_stderr=True)
)
audio_data = np.frombuffer(out, np.int16).astype(np.float32) / 32768.0
return audio_data
except ffmpeg.Error as e:
print("FFmpeg error:", e)
return None
def transcribe_audio_numpy(audio_data):
model_whisper = whisper.load_model("tiny")
result = model_whisper.transcribe(audio_data)
print("Transcription completed.")
return result["text"]
def summarize_text(transcription, max_tokens=512):
if len(transcription.split()) < 100:
return transcription
summary = summarizer(transcription, max_length=max_tokens, min_length=100, do_sample=False)
return summary[0]['summary_text']
def generate_questionnaire(summary):
prompt = f"""
You are a professional questionnaire generator reputed for generating diverse questionnaires, given any
information sample.
The questionnaire you generate must contain:
1. Three simple multiple-choice questions (each with 4 options).
2. One moderately difficult multiple-choice question (4 options).
3. Two simple open-ended questions.
4. Three moderately difficult open-ended questions.
5. One hard scenario-based open-ended question.
Make sure to cover each and every type of question mentioned.
Nothing else, no code. Stick strictly to the provided context.
Also, provide the questions in a structured, well-formatted, sequential manner.
Start question sections with ### Multiple-Choice Questions etc.
Generate a well-structured questionnaire based on the following content:
"{summary}"
"""
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
outputs = model.generate(
**inputs,
max_length=2000,
temperature=0.2,
top_p=0.8,
repetition_penalty=1.1,
do_sample=True
)
output_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
print("Questionnaire generation completed.")
return clean_questionnaire(output_text)
def clean_questionnaire(raw_text):
match = re.search(r"(### Multiple-Choice Questions.*?)$", raw_text, re.DOTALL)
cleaned_text = match.group(1) if match else raw_text
return cleaned_text.strip()
def save_text_as_pdf(text, filename):
c = canvas.Canvas(filename, pagesize=letter)
width, height = letter
margin = 50
text_object = c.beginText(margin, height - margin)
text_object.setFont("Helvetica", 12)
max_chars_per_line = 100
for paragraph in text.split("\n"):
wrapped_lines = textwrap.wrap(paragraph, width=max_chars_per_line)
if not wrapped_lines:
text_object.textLine("")
for line in wrapped_lines:
text_object.textLine(line)
if text_object.getY() < margin:
c.drawText(text_object)
c.showPage()
text_object = c.beginText(margin, height - margin)
text_object.setFont("Helvetica", 12)
c.drawText(text_object)
c.save()
def process_stream(video_url, output_pdf="questionnaire.pdf"):
print("Streaming audio...")
audio_bytes = stream_youtube_audio(video_url)
if not audio_bytes:
print("Error: Unable to fetch audio.")
return None
print("Converting audio stream to NumPy array...")
audio_data = audio_stream_to_numpy(audio_bytes)
if audio_data is None:
print("Error: Unable to process audio data.")
return None
print("Transcribing audio...")
transcription = transcribe_audio_numpy(audio_data)
if not transcription:
print("Error: Transcription failed.")
return None
print("Summarizing transcription...")
summary = summarize_text(transcription)
print("Generating questionnaire...")
questionnaire = generate_questionnaire(summary)
print("Converting questionnaire to PDF...")
save_text_as_pdf(questionnaire, output_pdf)
print(f"PDF generated: {output_pdf}")
return output_pdf
# -------------------------------
# API endpoints
# -------------------------------
@app.route('/process', methods=['POST'])
def process_video():
data = request.get_json()
video_url = data.get("video_url")
if not video_url:
return jsonify({"error": "No video URL provided."}), 400
pdf_file = process_stream(video_url)
if not pdf_file:
return jsonify({"error": "Processing failed. Check logs for details."}), 500
return send_file(pdf_file, as_attachment=True)
if __name__ == '__main__':
# When deploying on a cloud service, make sure the port is set appropriately.
app.run(debug=True, use_reloader=False, host="0.0.0.0", port=5000)