Update app.py
Browse files
app.py
CHANGED
@@ -109,23 +109,24 @@ try:
|
|
109 |
genai.types.FunctionDeclaration(
|
110 |
name='load_page',
|
111 |
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
|
112 |
-
#
|
113 |
parameters={
|
114 |
-
'type': 'object',
|
115 |
'properties': {
|
116 |
'url': {
|
117 |
-
'type': 'string',
|
118 |
'description': "The *full* URL of the webpage to load (must start with http:// or https://)."
|
119 |
}
|
120 |
},
|
121 |
'required': ['url']
|
122 |
}
|
123 |
-
# --- END OF CORRECTION ---
|
124 |
)
|
125 |
]
|
126 |
)
|
127 |
-
#
|
128 |
-
|
|
|
|
|
129 |
|
130 |
tools = [browse_tool, code_execution_tool]
|
131 |
|
@@ -197,22 +198,17 @@ def handle_function_call(function_call):
|
|
197 |
|
198 |
def generate_response_with_tools(user_input, history_state):
|
199 |
"""Handles user input, interacts with Gemini (incl. tools), and manages history."""
|
200 |
-
# Check if model initialization failed earlier
|
201 |
if not model:
|
202 |
-
# Return list format suitable for Gradio Chatbot display
|
203 |
return [[None, "Error: The AI model (Gemini) could not be initialized. Please check the logs or API key configuration."]], history_state or []
|
204 |
|
205 |
if not user_input.strip():
|
206 |
-
# Return tuple suitable for Gradio Chatbot display
|
207 |
return [[None, "Please enter a valid query."]], history_state or []
|
208 |
|
209 |
# --- History Management ---
|
210 |
conversation_history = history_state if isinstance(history_state, list) else []
|
211 |
-
# Use genai.types for Content and Part
|
212 |
conversation_history.append(genai.types.Content(role="user", parts=[genai.types.Part.from_text(user_input)]))
|
213 |
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
|
214 |
|
215 |
-
# Simple history trimming (can be made more sophisticated)
|
216 |
MAX_HISTORY_TURNS = 10
|
217 |
max_history_items = MAX_HISTORY_TURNS * 2 + (1 if conversation_history and conversation_history[0].role == "system" else 0)
|
218 |
if len(conversation_history) > max_history_items:
|
@@ -225,9 +221,8 @@ def generate_response_with_tools(user_input, history_state):
|
|
225 |
# --- Interaction Loop ---
|
226 |
MAX_TOOL_LOOPS = 5
|
227 |
loop_count = 0
|
228 |
-
# Work with a copy of history in the loop to easily revert on error
|
229 |
current_history_for_api = list(conversation_history)
|
230 |
-
final_bot_message = ""
|
231 |
|
232 |
try:
|
233 |
while loop_count < MAX_TOOL_LOOPS:
|
@@ -236,7 +231,7 @@ def generate_response_with_tools(user_input, history_state):
|
|
236 |
|
237 |
response = model.generate_content(
|
238 |
current_history_for_api,
|
239 |
-
request_options={"timeout": 120},
|
240 |
)
|
241 |
|
242 |
if not response.candidates:
|
@@ -248,23 +243,18 @@ def generate_response_with_tools(user_input, history_state):
|
|
248 |
candidate = response.candidates[0]
|
249 |
finish_reason = candidate.finish_reason
|
250 |
|
251 |
-
# Append model's (potential) response to history *before* handling tools
|
252 |
-
# This ensures the model's thought process (including asking for tools) is recorded
|
253 |
if candidate.content:
|
254 |
current_history_for_api.append(candidate.content)
|
255 |
|
256 |
-
# Check for non-stop, non-tool reasons first
|
257 |
if finish_reason not in (genai.types.Candidate.FinishReason.STOP, genai.types.Candidate.FinishReason.TOOL_CALL):
|
258 |
print(f"Warning: Generation stopped unexpectedly. Reason: {finish_reason.name}")
|
259 |
stop_reason_msg = f"[Model stopped generating. Reason: {finish_reason.name}]"
|
260 |
partial_text = ""
|
261 |
-
if candidate.content and candidate.content.parts:
|
262 |
partial_text = "".join([p.text for p in candidate.content.parts if hasattr(p, 'text') and p.text])
|
263 |
final_bot_message = (partial_text + "\n" if partial_text else "") + stop_reason_msg
|
264 |
-
|
265 |
-
break # Exit loop
|
266 |
|
267 |
-
# Check for Tool Call
|
268 |
has_tool_call = finish_reason == genai.types.Candidate.FinishReason.TOOL_CALL
|
269 |
|
270 |
if has_tool_call:
|
@@ -274,18 +264,15 @@ def generate_response_with_tools(user_input, history_state):
|
|
274 |
final_bot_message = "[Model indicated tool use but provided no details.]"
|
275 |
break
|
276 |
|
277 |
-
# Extract valid function calls
|
278 |
function_calls = [part.function_call for part in candidate.content.parts if hasattr(part, 'function_call') and part.function_call]
|
279 |
|
280 |
if not function_calls:
|
281 |
print("Warning: TOOL_CALL finish reason but no valid function_call part found.")
|
282 |
-
# Extract any text response that might have come along
|
283 |
final_bot_message = "".join([p.text for p in candidate.content.parts if hasattr(p, 'text') and p.text])
|
284 |
if not final_bot_message:
|
285 |
final_bot_message = "[Model indicated tool use but provided no callable function or text.]"
|
286 |
break
|
287 |
|
288 |
-
# Execute tools and collect responses
|
289 |
tool_responses = []
|
290 |
for func_call in function_calls:
|
291 |
function_response_part = handle_function_call(func_call)
|
@@ -296,18 +283,15 @@ def generate_response_with_tools(user_input, history_state):
|
|
296 |
final_bot_message = "[Failed to process tool call request.]"
|
297 |
break
|
298 |
|
299 |
-
# Add the tool execution results to history for the next API call
|
300 |
current_history_for_api.append(genai.types.Content(role="tool", parts=tool_responses))
|
301 |
print("Added tool response(s) to history. Continuing loop...")
|
302 |
-
final_bot_message = ""
|
303 |
-
continue
|
304 |
|
305 |
else: # FinishReason == STOP
|
306 |
print("No tool call requested. Final response received.")
|
307 |
-
# Extract final text and any code suggestions/results from the *last* model turn
|
308 |
final_bot_message = ""
|
309 |
code_parts_display = []
|
310 |
-
# Ensure we look at the last content which should be the final model response
|
311 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
312 |
last_model_content = current_history_for_api[-1]
|
313 |
if last_model_content.parts:
|
@@ -315,13 +299,12 @@ def generate_response_with_tools(user_input, history_state):
|
|
315 |
if hasattr(part, 'text') and part.text:
|
316 |
final_bot_message += part.text
|
317 |
if hasattr(part, 'executable_code') and part.executable_code:
|
318 |
-
lang = getattr(getattr(part.executable_code, 'language', None), 'name', 'python').lower()
|
319 |
-
code = getattr(part.executable_code, 'code', '')
|
320 |
code_parts_display.append(f"Suggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```")
|
321 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
322 |
-
# Safer access to outcome comparison value
|
323 |
outcome_enum = getattr(genai.types, 'ExecutableCodeResponse', None)
|
324 |
-
outcome_ok_val = getattr(outcome_enum.Outcome, 'OK', None) if outcome_enum and hasattr(outcome_enum, 'Outcome') else 1
|
325 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
326 |
outcome_str = "Success" if outcome_val == outcome_ok_val else "Failure"
|
327 |
output = getattr(part.code_execution_result, 'output', '')
|
@@ -332,22 +315,21 @@ def generate_response_with_tools(user_input, history_state):
|
|
332 |
|
333 |
if not final_bot_message.strip():
|
334 |
final_bot_message = "[Assistant completed its turn without generating text output.]"
|
335 |
-
# Ensure this message is associated with the last model turn in history if needed
|
336 |
if current_history_for_api[-1].role == "model" and not any(hasattr(p,'text') and p.text for p in current_history_for_api[-1].parts):
|
|
|
|
|
337 |
current_history_for_api[-1].parts.append(genai.types.Part.from_text(final_bot_message))
|
338 |
|
339 |
-
break
|
340 |
|
341 |
# End of while loop
|
342 |
if loop_count >= MAX_TOOL_LOOPS:
|
343 |
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
|
344 |
warning_msg = f"\n\n[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
|
345 |
final_bot_message += warning_msg
|
346 |
-
# Append warning to the last model message in history
|
347 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
348 |
-
# Check if parts list exists before appending
|
349 |
if not hasattr(current_history_for_api[-1], 'parts') or not current_history_for_api[-1].parts:
|
350 |
-
|
351 |
current_history_for_api[-1].parts.append(genai.types.Part.from_text(warning_msg))
|
352 |
|
353 |
print("--- Response Generation Complete ---")
|
@@ -355,11 +337,11 @@ def generate_response_with_tools(user_input, history_state):
|
|
355 |
# --- Format final output for Gradio Chatbot ---
|
356 |
chatbot_display_list = []
|
357 |
user_msg_buffer = None
|
358 |
-
for content in current_history_for_api:
|
359 |
if content.role == "system": continue
|
360 |
|
361 |
display_text = ""
|
362 |
-
if hasattr(content, 'parts') and content.parts:
|
363 |
for part in content.parts:
|
364 |
if hasattr(part, 'text') and part.text:
|
365 |
display_text += part.text + "\n"
|
@@ -368,9 +350,9 @@ def generate_response_with_tools(user_input, history_state):
|
|
368 |
code = getattr(part.executable_code, 'code', '')
|
369 |
display_text += f"\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```\n"
|
370 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
371 |
-
|
|
|
372 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
373 |
-
outcome_ok_val = getattr(getattr(genai.types, 'ExecutableCodeResponse', {}).get('Outcome',{}), 'OK', 1)
|
374 |
outcome_str = "Success" if outcome_val == outcome_ok_val else "Failure"
|
375 |
output = getattr(part.code_execution_result, 'output', '')
|
376 |
display_text += f"\nCode Execution Result ({outcome_str}):\n```\n{output}\n```\n"
|
@@ -387,27 +369,23 @@ def generate_response_with_tools(user_input, history_state):
|
|
387 |
user_msg_buffer = None
|
388 |
else:
|
389 |
chatbot_display_list.append([None, display_text])
|
390 |
-
# Ignore 'tool' role messages for chat display
|
391 |
|
392 |
-
# If loop ended with user msg pending
|
393 |
if user_msg_buffer is not None:
|
394 |
chatbot_display_list.append([user_msg_buffer, None])
|
395 |
|
396 |
-
# Return the final display list and the complete history state
|
397 |
return chatbot_display_list, current_history_for_api
|
398 |
|
399 |
except Exception as e:
|
400 |
print(f"ERROR during Gemini generation or tool processing: {str(e)}")
|
401 |
traceback.print_exc()
|
402 |
error_message = f"An error occurred: {str(e)}"
|
403 |
-
# Rebuild display history from the state *before* this turn failed + error message
|
404 |
error_display_list = []
|
405 |
if isinstance(history_state, list):
|
406 |
temp_user_msg = None
|
407 |
-
for content in history_state:
|
408 |
if content.role == "system": continue
|
409 |
text = ""
|
410 |
-
if hasattr(content, 'parts') and content.parts:
|
411 |
text = "".join([p.text for p in content.parts if hasattr(p, 'text')])
|
412 |
if content.role == "user": temp_user_msg = text
|
413 |
elif content.role == "model" and temp_user_msg:
|
@@ -416,10 +394,8 @@ def generate_response_with_tools(user_input, history_state):
|
|
416 |
elif content.role == "model": error_display_list.append([None, text])
|
417 |
if temp_user_msg: error_display_list.append([temp_user_msg, None])
|
418 |
|
419 |
-
error_display_list.append([None, error_message])
|
420 |
|
421 |
-
# Return the history state *before* the failed turn started
|
422 |
-
# Make sure conversation_history exists and is a list before slicing
|
423 |
previous_history = conversation_history[:-1] if isinstance(conversation_history, list) and conversation_history else []
|
424 |
return error_display_list, previous_history
|
425 |
|
@@ -430,10 +406,9 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
430 |
gr.Markdown(f"# 🚀 Gemini AI Assistant ({MODEL_NAME})")
|
431 |
gr.Markdown("Ask questions, request info from specific URLs, or ask for code/calculations. Uses function calling and code execution.")
|
432 |
|
433 |
-
# Acknowledge Gradio warnings but keep tuple format for now
|
434 |
chatbot_display = gr.Chatbot(
|
435 |
label="Conversation",
|
436 |
-
bubble_full_width=False,
|
437 |
height=600,
|
438 |
show_copy_button=True,
|
439 |
render_markdown=True,
|
@@ -448,27 +423,26 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
448 |
)
|
449 |
with gr.Column(scale=1, min_width=150):
|
450 |
send_btn = gr.Button("➡️ Send", variant="primary")
|
451 |
-
clear_btn = gr.ClearButton(value="🗑️ Clear Chat") #
|
452 |
|
453 |
chat_history_state = gr.State([])
|
454 |
|
455 |
def user_message_update(user_message, history_display_list):
|
456 |
-
"""Appends
|
457 |
if not user_message.strip():
|
458 |
return gr.update(value=""), history_display_list
|
459 |
-
# Append as [user_msg, None] pair
|
460 |
return gr.update(value=""), history_display_list + [[user_message, None]]
|
461 |
|
462 |
def bot_response_update(history_display_list, history_state):
|
463 |
-
"""Calls
|
|
|
464 |
if not history_display_list or (len(history_display_list[-1]) > 1 and history_display_list[-1][1] is not None):
|
465 |
print("Bot update called without pending user message in display list.")
|
466 |
-
return history_display_list, history_state
|
467 |
|
468 |
user_message = history_display_list[-1][0]
|
469 |
print(f"User message being sent to backend: {user_message}")
|
470 |
|
471 |
-
# Backend now returns the *full* display list and the updated state
|
472 |
updated_display_list, updated_history_state = generate_response_with_tools(user_message, history_state)
|
473 |
|
474 |
return updated_display_list, updated_history_state
|
@@ -498,7 +472,7 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
498 |
|
499 |
# Custom clear function to reset state as well
|
500 |
def clear_all():
|
501 |
-
#
|
502 |
return ["", None, []]
|
503 |
|
504 |
# Wire clear button to the custom function
|
@@ -507,6 +481,6 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
507 |
|
508 |
if __name__ == "__main__":
|
509 |
print("Starting Gradio App...")
|
510 |
-
#
|
511 |
-
demo.queue().launch(server_name="0.0.0.0", server_port=7860, show_error=True)
|
512 |
print("Gradio App Stopped.")
|
|
|
109 |
genai.types.FunctionDeclaration(
|
110 |
name='load_page',
|
111 |
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
|
112 |
+
# Use basic string types for schema definition
|
113 |
parameters={
|
114 |
+
'type': 'object',
|
115 |
'properties': {
|
116 |
'url': {
|
117 |
+
'type': 'string',
|
118 |
'description': "The *full* URL of the webpage to load (must start with http:// or https://)."
|
119 |
}
|
120 |
},
|
121 |
'required': ['url']
|
122 |
}
|
|
|
123 |
)
|
124 |
]
|
125 |
)
|
126 |
+
# --- CORRECTED Code Execution Tool Definition ---
|
127 |
+
# Pass an empty dictionary to enable code execution capability
|
128 |
+
code_execution_tool = genai.types.Tool(code_execution={})
|
129 |
+
# --- END OF CORRECTION ---
|
130 |
|
131 |
tools = [browse_tool, code_execution_tool]
|
132 |
|
|
|
198 |
|
199 |
def generate_response_with_tools(user_input, history_state):
|
200 |
"""Handles user input, interacts with Gemini (incl. tools), and manages history."""
|
|
|
201 |
if not model:
|
|
|
202 |
return [[None, "Error: The AI model (Gemini) could not be initialized. Please check the logs or API key configuration."]], history_state or []
|
203 |
|
204 |
if not user_input.strip():
|
|
|
205 |
return [[None, "Please enter a valid query."]], history_state or []
|
206 |
|
207 |
# --- History Management ---
|
208 |
conversation_history = history_state if isinstance(history_state, list) else []
|
|
|
209 |
conversation_history.append(genai.types.Content(role="user", parts=[genai.types.Part.from_text(user_input)]))
|
210 |
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
|
211 |
|
|
|
212 |
MAX_HISTORY_TURNS = 10
|
213 |
max_history_items = MAX_HISTORY_TURNS * 2 + (1 if conversation_history and conversation_history[0].role == "system" else 0)
|
214 |
if len(conversation_history) > max_history_items:
|
|
|
221 |
# --- Interaction Loop ---
|
222 |
MAX_TOOL_LOOPS = 5
|
223 |
loop_count = 0
|
|
|
224 |
current_history_for_api = list(conversation_history)
|
225 |
+
final_bot_message = ""
|
226 |
|
227 |
try:
|
228 |
while loop_count < MAX_TOOL_LOOPS:
|
|
|
231 |
|
232 |
response = model.generate_content(
|
233 |
current_history_for_api,
|
234 |
+
request_options={"timeout": 120},
|
235 |
)
|
236 |
|
237 |
if not response.candidates:
|
|
|
243 |
candidate = response.candidates[0]
|
244 |
finish_reason = candidate.finish_reason
|
245 |
|
|
|
|
|
246 |
if candidate.content:
|
247 |
current_history_for_api.append(candidate.content)
|
248 |
|
|
|
249 |
if finish_reason not in (genai.types.Candidate.FinishReason.STOP, genai.types.Candidate.FinishReason.TOOL_CALL):
|
250 |
print(f"Warning: Generation stopped unexpectedly. Reason: {finish_reason.name}")
|
251 |
stop_reason_msg = f"[Model stopped generating. Reason: {finish_reason.name}]"
|
252 |
partial_text = ""
|
253 |
+
if candidate.content and candidate.content.parts:
|
254 |
partial_text = "".join([p.text for p in candidate.content.parts if hasattr(p, 'text') and p.text])
|
255 |
final_bot_message = (partial_text + "\n" if partial_text else "") + stop_reason_msg
|
256 |
+
break
|
|
|
257 |
|
|
|
258 |
has_tool_call = finish_reason == genai.types.Candidate.FinishReason.TOOL_CALL
|
259 |
|
260 |
if has_tool_call:
|
|
|
264 |
final_bot_message = "[Model indicated tool use but provided no details.]"
|
265 |
break
|
266 |
|
|
|
267 |
function_calls = [part.function_call for part in candidate.content.parts if hasattr(part, 'function_call') and part.function_call]
|
268 |
|
269 |
if not function_calls:
|
270 |
print("Warning: TOOL_CALL finish reason but no valid function_call part found.")
|
|
|
271 |
final_bot_message = "".join([p.text for p in candidate.content.parts if hasattr(p, 'text') and p.text])
|
272 |
if not final_bot_message:
|
273 |
final_bot_message = "[Model indicated tool use but provided no callable function or text.]"
|
274 |
break
|
275 |
|
|
|
276 |
tool_responses = []
|
277 |
for func_call in function_calls:
|
278 |
function_response_part = handle_function_call(func_call)
|
|
|
283 |
final_bot_message = "[Failed to process tool call request.]"
|
284 |
break
|
285 |
|
|
|
286 |
current_history_for_api.append(genai.types.Content(role="tool", parts=tool_responses))
|
287 |
print("Added tool response(s) to history. Continuing loop...")
|
288 |
+
final_bot_message = ""
|
289 |
+
continue
|
290 |
|
291 |
else: # FinishReason == STOP
|
292 |
print("No tool call requested. Final response received.")
|
|
|
293 |
final_bot_message = ""
|
294 |
code_parts_display = []
|
|
|
295 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
296 |
last_model_content = current_history_for_api[-1]
|
297 |
if last_model_content.parts:
|
|
|
299 |
if hasattr(part, 'text') and part.text:
|
300 |
final_bot_message += part.text
|
301 |
if hasattr(part, 'executable_code') and part.executable_code:
|
302 |
+
lang = getattr(getattr(part.executable_code, 'language', None), 'name', 'python').lower()
|
303 |
+
code = getattr(part.executable_code, 'code', '')
|
304 |
code_parts_display.append(f"Suggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```")
|
305 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
|
|
306 |
outcome_enum = getattr(genai.types, 'ExecutableCodeResponse', None)
|
307 |
+
outcome_ok_val = getattr(outcome_enum.Outcome, 'OK', None) if outcome_enum and hasattr(outcome_enum, 'Outcome') else 1
|
308 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
309 |
outcome_str = "Success" if outcome_val == outcome_ok_val else "Failure"
|
310 |
output = getattr(part.code_execution_result, 'output', '')
|
|
|
315 |
|
316 |
if not final_bot_message.strip():
|
317 |
final_bot_message = "[Assistant completed its turn without generating text output.]"
|
|
|
318 |
if current_history_for_api[-1].role == "model" and not any(hasattr(p,'text') and p.text for p in current_history_for_api[-1].parts):
|
319 |
+
if not hasattr(current_history_for_api[-1], 'parts') or not current_history_for_api[-1].parts:
|
320 |
+
current_history_for_api[-1].parts = [] # Initialize if needed
|
321 |
current_history_for_api[-1].parts.append(genai.types.Part.from_text(final_bot_message))
|
322 |
|
323 |
+
break
|
324 |
|
325 |
# End of while loop
|
326 |
if loop_count >= MAX_TOOL_LOOPS:
|
327 |
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
|
328 |
warning_msg = f"\n\n[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
|
329 |
final_bot_message += warning_msg
|
|
|
330 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
|
|
331 |
if not hasattr(current_history_for_api[-1], 'parts') or not current_history_for_api[-1].parts:
|
332 |
+
current_history_for_api[-1].parts = []
|
333 |
current_history_for_api[-1].parts.append(genai.types.Part.from_text(warning_msg))
|
334 |
|
335 |
print("--- Response Generation Complete ---")
|
|
|
337 |
# --- Format final output for Gradio Chatbot ---
|
338 |
chatbot_display_list = []
|
339 |
user_msg_buffer = None
|
340 |
+
for content in current_history_for_api:
|
341 |
if content.role == "system": continue
|
342 |
|
343 |
display_text = ""
|
344 |
+
if hasattr(content, 'parts') and content.parts:
|
345 |
for part in content.parts:
|
346 |
if hasattr(part, 'text') and part.text:
|
347 |
display_text += part.text + "\n"
|
|
|
350 |
code = getattr(part.executable_code, 'code', '')
|
351 |
display_text += f"\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```\n"
|
352 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
353 |
+
outcome_enum = getattr(genai.types, 'ExecutableCodeResponse', None)
|
354 |
+
outcome_ok_val = getattr(outcome_enum.Outcome, 'OK', None) if outcome_enum and hasattr(outcome_enum, 'Outcome') else 1
|
355 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
|
|
356 |
outcome_str = "Success" if outcome_val == outcome_ok_val else "Failure"
|
357 |
output = getattr(part.code_execution_result, 'output', '')
|
358 |
display_text += f"\nCode Execution Result ({outcome_str}):\n```\n{output}\n```\n"
|
|
|
369 |
user_msg_buffer = None
|
370 |
else:
|
371 |
chatbot_display_list.append([None, display_text])
|
|
|
372 |
|
|
|
373 |
if user_msg_buffer is not None:
|
374 |
chatbot_display_list.append([user_msg_buffer, None])
|
375 |
|
|
|
376 |
return chatbot_display_list, current_history_for_api
|
377 |
|
378 |
except Exception as e:
|
379 |
print(f"ERROR during Gemini generation or tool processing: {str(e)}")
|
380 |
traceback.print_exc()
|
381 |
error_message = f"An error occurred: {str(e)}"
|
|
|
382 |
error_display_list = []
|
383 |
if isinstance(history_state, list):
|
384 |
temp_user_msg = None
|
385 |
+
for content in history_state:
|
386 |
if content.role == "system": continue
|
387 |
text = ""
|
388 |
+
if hasattr(content, 'parts') and content.parts:
|
389 |
text = "".join([p.text for p in content.parts if hasattr(p, 'text')])
|
390 |
if content.role == "user": temp_user_msg = text
|
391 |
elif content.role == "model" and temp_user_msg:
|
|
|
394 |
elif content.role == "model": error_display_list.append([None, text])
|
395 |
if temp_user_msg: error_display_list.append([temp_user_msg, None])
|
396 |
|
397 |
+
error_display_list.append([None, error_message])
|
398 |
|
|
|
|
|
399 |
previous_history = conversation_history[:-1] if isinstance(conversation_history, list) and conversation_history else []
|
400 |
return error_display_list, previous_history
|
401 |
|
|
|
406 |
gr.Markdown(f"# 🚀 Gemini AI Assistant ({MODEL_NAME})")
|
407 |
gr.Markdown("Ask questions, request info from specific URLs, or ask for code/calculations. Uses function calling and code execution.")
|
408 |
|
|
|
409 |
chatbot_display = gr.Chatbot(
|
410 |
label="Conversation",
|
411 |
+
bubble_full_width=False,
|
412 |
height=600,
|
413 |
show_copy_button=True,
|
414 |
render_markdown=True,
|
|
|
423 |
)
|
424 |
with gr.Column(scale=1, min_width=150):
|
425 |
send_btn = gr.Button("➡️ Send", variant="primary")
|
426 |
+
clear_btn = gr.ClearButton(value="🗑️ Clear Chat") # Uses custom clear_all below
|
427 |
|
428 |
chat_history_state = gr.State([])
|
429 |
|
430 |
def user_message_update(user_message, history_display_list):
|
431 |
+
"""Appends user message to display list and clears input."""
|
432 |
if not user_message.strip():
|
433 |
return gr.update(value=""), history_display_list
|
|
|
434 |
return gr.update(value=""), history_display_list + [[user_message, None]]
|
435 |
|
436 |
def bot_response_update(history_display_list, history_state):
|
437 |
+
"""Calls backend Gemini function and updates display/state."""
|
438 |
+
# Check if the last item in display list is a user message awaiting response
|
439 |
if not history_display_list or (len(history_display_list[-1]) > 1 and history_display_list[-1][1] is not None):
|
440 |
print("Bot update called without pending user message in display list.")
|
441 |
+
return history_display_list, history_state # Return current state
|
442 |
|
443 |
user_message = history_display_list[-1][0]
|
444 |
print(f"User message being sent to backend: {user_message}")
|
445 |
|
|
|
446 |
updated_display_list, updated_history_state = generate_response_with_tools(user_message, history_state)
|
447 |
|
448 |
return updated_display_list, updated_history_state
|
|
|
472 |
|
473 |
# Custom clear function to reset state as well
|
474 |
def clear_all():
|
475 |
+
# Returns values for outputs=[msg_input, chatbot_display, chat_history_state]
|
476 |
return ["", None, []]
|
477 |
|
478 |
# Wire clear button to the custom function
|
|
|
481 |
|
482 |
if __name__ == "__main__":
|
483 |
print("Starting Gradio App...")
|
484 |
+
# show_error=True can be helpful for seeing Gradio errors in the browser console
|
485 |
+
demo.queue().launch(server_name="0.0.0.0", server_port=7860, show_error=True)
|
486 |
print("Gradio App Stopped.")
|