|
import gradio as gr |
|
import time |
|
|
|
|
|
try: |
|
from openai import OpenAI |
|
except ImportError: |
|
print("OpenAI package not installed. Installing now...") |
|
import subprocess |
|
import sys |
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "openai"]) |
|
from openai import OpenAI |
|
|
|
def test_api_connection(api_key): |
|
"""Test the API connection and return a status message""" |
|
if not api_key or api_key.strip() == "": |
|
return False, "API key is required" |
|
|
|
try: |
|
client = OpenAI( |
|
base_url="https://openrouter.ai/api/v1", |
|
api_key=api_key.strip(), |
|
) |
|
|
|
client.models.list() |
|
return True, "API connection successful" |
|
except Exception as e: |
|
return False, f"API connection failed: {str(e)}" |
|
|
|
def generate_solution(api_key, problem_statement, progress=gr.Progress()): |
|
"""Generate solution with progress updates""" |
|
progress(0, desc="Starting...") |
|
|
|
|
|
if not api_key or api_key.strip() == "": |
|
return "Error: OpenRouter API key is required" |
|
|
|
if not problem_statement or problem_statement.strip() == "": |
|
return "Error: Please provide a problem statement" |
|
|
|
progress(0.1, desc="Validating API key...") |
|
|
|
success, message = test_api_connection(api_key) |
|
if not success: |
|
return f"Error: {message}" |
|
|
|
progress(0.3, desc="Sending request to AI model...") |
|
try: |
|
client = OpenAI( |
|
base_url="https://openrouter.ai/api/v1", |
|
api_key=api_key.strip(), |
|
) |
|
|
|
progress(0.5, desc="Generating solution...") |
|
completion = client.chat.completions.create( |
|
model="open-r1/olympiccoder-7b:free", |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": "You are a competitive programming expert. Provide a correct solution with clear reasoning. First output the code, then explain the approach." |
|
}, |
|
{ |
|
"role": "user", |
|
"content": f"Solve this problem:\n{problem_statement}" |
|
} |
|
], |
|
temperature=0.3, |
|
max_tokens=2048 |
|
) |
|
|
|
progress(0.8, desc="Processing response...") |
|
response = completion.choices[0].message.content |
|
|
|
progress(0.9, desc="Formatting output...") |
|
formatted_response = format_response(response) |
|
|
|
progress(1.0, desc="Done!") |
|
return formatted_response |
|
|
|
except Exception as e: |
|
error_message = str(e) |
|
print(f"Error occurred: {error_message}") |
|
return f"Error: {error_message}" |
|
|
|
def format_response(response): |
|
"""Format the AI response into markdown with code blocks""" |
|
if not response: |
|
return "Error: Received empty response from AI model" |
|
|
|
formatted = [] |
|
lines = response.split('\n') |
|
|
|
|
|
in_code = False |
|
code_blocks = [] |
|
current_block = [] |
|
|
|
for line in lines: |
|
if line.strip().startswith('```'): |
|
if in_code: |
|
code_blocks.append(current_block) |
|
current_block = [] |
|
else: |
|
current_block = [] |
|
in_code = not in_code |
|
continue |
|
|
|
if in_code: |
|
current_block.append(line) |
|
|
|
|
|
if in_code and current_block: |
|
code_blocks.append(current_block) |
|
|
|
|
|
processed_response = response |
|
|
|
|
|
for i, block in enumerate(code_blocks): |
|
|
|
if block and any(block[0].strip().lower() == lang for lang in ["python", "java", "c++", "cpp", "javascript"]): |
|
code_content = '\n'.join(block[1:]) |
|
lang = block[0].strip().lower() |
|
else: |
|
code_content = '\n'.join(block) |
|
lang = "python" |
|
|
|
|
|
if i < len(code_blocks) - 1 or not in_code: |
|
block_text = '```' + '\n' + '\n'.join(block) + '\n' + '```' |
|
else: |
|
block_text = '```' + '\n' + '\n'.join(block) |
|
|
|
|
|
formatted_block = f"```{lang}\n{code_content}\n```" |
|
processed_response = processed_response.replace(block_text, formatted_block) |
|
|
|
return processed_response |
|
|
|
|
|
with gr.Blocks(title="Competitive Programming Assistant", theme=gr.themes.Soft()) as app: |
|
gr.Markdown("# π Competitive Programming Assistant") |
|
gr.Markdown("Powered by OlympicCoder-7B via OpenRouter AI") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
api_key = gr.Textbox( |
|
label="OpenRouter API Key", |
|
type="password", |
|
placeholder="Enter your API key here...", |
|
value="" |
|
) |
|
|
|
test_btn = gr.Button("Test API Connection", variant="secondary") |
|
api_status = gr.Textbox(label="API Status", interactive=False) |
|
|
|
test_btn.click( |
|
fn=test_api_connection, |
|
inputs=[api_key], |
|
outputs=[gr.Checkbox(visible=False), api_status] |
|
) |
|
|
|
with gr.Row(): |
|
problem_input = gr.Textbox( |
|
label="Problem Statement", |
|
lines=5, |
|
placeholder="Paste your programming problem here..." |
|
) |
|
|
|
submit_btn = gr.Button("Generate Solution", variant="primary", size="lg") |
|
|
|
|
|
status = gr.Markdown("Ready to generate solutions") |
|
|
|
|
|
solution_output = gr.Markdown(label="Solution") |
|
|
|
|
|
gr.Examples( |
|
examples=[ |
|
[ |
|
"Given an array of integers, find two numbers such that they add up to a specific target number." |
|
], |
|
[ |
|
"Implement a function to calculate the minimum number of operations required to transform one string into another using only insertion, deletion, and substitution." |
|
] |
|
], |
|
inputs=[problem_input] |
|
) |
|
|
|
|
|
def on_submit_click(api_key, problem): |
|
return "Generating solution... Please wait." |
|
|
|
submit_btn.click( |
|
fn=on_submit_click, |
|
inputs=[api_key, problem_input], |
|
outputs=[status], |
|
queue=False |
|
).then( |
|
fn=generate_solution, |
|
inputs=[api_key, problem_input], |
|
outputs=[solution_output], |
|
queue=True |
|
).success( |
|
fn=lambda: "Solution generated successfully!", |
|
inputs=None, |
|
outputs=[status] |
|
).error( |
|
fn=lambda: "An error occurred. Please check your inputs and try again.", |
|
inputs=None, |
|
outputs=[status] |
|
) |
|
|
|
if __name__ == "__main__": |
|
try: |
|
print("Starting Competitive Programming Assistant...") |
|
app.launch(server_port=7860, share=True, debug=True) |
|
except Exception as e: |
|
print(f"Error launching app: {str(e)}") |