Update app.py
Browse files
app.py
CHANGED
@@ -1,9 +1,10 @@
|
|
1 |
# -*- coding: utf-8 -*-
|
2 |
import os
|
3 |
import gradio as gr
|
4 |
-
#
|
5 |
import google.generativeai as genai
|
6 |
-
#
|
|
|
7 |
|
8 |
import requests
|
9 |
import markdownify
|
@@ -103,13 +104,12 @@ try:
|
|
103 |
MODEL_NAME = "gemini-2.5-pro-exp-03-25"
|
104 |
print(f"Attempting to use EXPERIMENTAL model: {MODEL_NAME}")
|
105 |
|
106 |
-
#
|
107 |
browse_tool = genai.types.Tool(
|
108 |
function_declarations=[
|
109 |
genai.types.FunctionDeclaration(
|
110 |
name='load_page',
|
111 |
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
|
112 |
-
# Use basic string types for schema definition
|
113 |
parameters={
|
114 |
'type': 'object',
|
115 |
'properties': {
|
@@ -123,16 +123,15 @@ try:
|
|
123 |
)
|
124 |
]
|
125 |
)
|
126 |
-
#
|
127 |
-
# Pass an empty dictionary to enable code execution capability
|
128 |
code_execution_tool = genai.types.Tool(code_execution={})
|
129 |
-
# --- END OF CORRECTION ---
|
130 |
|
131 |
tools = [browse_tool, code_execution_tool]
|
132 |
|
133 |
model = genai.GenerativeModel(
|
134 |
model_name=MODEL_NAME,
|
135 |
tools=tools,
|
|
|
136 |
safety_settings={
|
137 |
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
138 |
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
@@ -146,7 +145,6 @@ try:
|
|
146 |
except Exception as e:
|
147 |
print(f"CRITICAL ERROR: Error initializing Gemini client: {e}")
|
148 |
traceback.print_exc()
|
149 |
-
# Set model to None so the app can report initialization failure
|
150 |
model = None
|
151 |
tools = []
|
152 |
|
@@ -156,7 +154,7 @@ except Exception as e:
|
|
156 |
def handle_function_call(function_call):
|
157 |
"""Executes the function call requested by the model."""
|
158 |
function_name = function_call.name
|
159 |
-
args = function_call.args
|
160 |
|
161 |
print(f"Executing Function Call: {function_name} with args: {dict(args)}")
|
162 |
|
@@ -165,7 +163,7 @@ def handle_function_call(function_call):
|
|
165 |
url = args.get('url')
|
166 |
if url:
|
167 |
function_response_content = load_page(url=url)
|
168 |
-
MAX_RESPONSE_LEN = 50000
|
169 |
if len(function_response_content) > MAX_RESPONSE_LEN:
|
170 |
print(f"Tool Response truncated from {len(function_response_content)} to {MAX_RESPONSE_LEN} chars.")
|
171 |
function_response_content = function_response_content[:MAX_RESPONSE_LEN] + "\n\n[... Tool Response Truncated Due to Size Limit ...]"
|
@@ -175,11 +173,11 @@ def handle_function_call(function_call):
|
|
175 |
print(f"Error: Received call for unknown function '{function_name}'")
|
176 |
function_response_content = f"Error: Unknown function '{function_name}' called by the model."
|
177 |
|
178 |
-
# Use genai.
|
179 |
-
function_response_part = genai.
|
180 |
function_response=genai.types.FunctionResponse(
|
181 |
name=function_name,
|
182 |
-
response={'content': function_response_content}
|
183 |
)
|
184 |
)
|
185 |
print(f"Function Response generated for {function_name}")
|
@@ -188,8 +186,8 @@ def handle_function_call(function_call):
|
|
188 |
except Exception as e:
|
189 |
print(f"Error during execution of function '{function_name}': {e}")
|
190 |
traceback.print_exc()
|
191 |
-
# Use genai.
|
192 |
-
return genai.
|
193 |
function_response=genai.types.FunctionResponse(
|
194 |
name=function_name,
|
195 |
response={'error': f"Failed to execute function {function_name}: {str(e)}"}
|
@@ -206,7 +204,8 @@ def generate_response_with_tools(user_input, history_state):
|
|
206 |
|
207 |
# --- History Management ---
|
208 |
conversation_history = history_state if isinstance(history_state, list) else []
|
209 |
-
|
|
|
210 |
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
|
211 |
|
212 |
MAX_HISTORY_TURNS = 10
|
@@ -237,15 +236,19 @@ def generate_response_with_tools(user_input, history_state):
|
|
237 |
if not response.candidates:
|
238 |
print("Warning: No candidates received from Gemini.")
|
239 |
final_bot_message = "[No response generated by the model.]"
|
240 |
-
|
|
|
241 |
break
|
242 |
|
243 |
candidate = response.candidates[0]
|
|
|
244 |
finish_reason = candidate.finish_reason
|
245 |
|
|
|
246 |
if candidate.content:
|
247 |
-
current_history_for_api.append(candidate.content)
|
248 |
|
|
|
249 |
if finish_reason not in (genai.types.Candidate.FinishReason.STOP, genai.types.Candidate.FinishReason.TOOL_CALL):
|
250 |
print(f"Warning: Generation stopped unexpectedly. Reason: {finish_reason.name}")
|
251 |
stop_reason_msg = f"[Model stopped generating. Reason: {finish_reason.name}]"
|
@@ -255,10 +258,12 @@ def generate_response_with_tools(user_input, history_state):
|
|
255 |
final_bot_message = (partial_text + "\n" if partial_text else "") + stop_reason_msg
|
256 |
break
|
257 |
|
|
|
258 |
has_tool_call = finish_reason == genai.types.Candidate.FinishReason.TOOL_CALL
|
259 |
|
260 |
if has_tool_call:
|
261 |
print("Tool call requested by model.")
|
|
|
262 |
if not candidate.content or not candidate.content.parts:
|
263 |
print("Error: TOOL_CALL indicated but candidate content/parts is missing.")
|
264 |
final_bot_message = "[Model indicated tool use but provided no details.]"
|
@@ -275,6 +280,7 @@ def generate_response_with_tools(user_input, history_state):
|
|
275 |
|
276 |
tool_responses = []
|
277 |
for func_call in function_calls:
|
|
|
278 |
function_response_part = handle_function_call(func_call)
|
279 |
tool_responses.append(function_response_part)
|
280 |
|
@@ -283,7 +289,8 @@ def generate_response_with_tools(user_input, history_state):
|
|
283 |
final_bot_message = "[Failed to process tool call request.]"
|
284 |
break
|
285 |
|
286 |
-
|
|
|
287 |
print("Added tool response(s) to history. Continuing loop...")
|
288 |
final_bot_message = ""
|
289 |
continue
|
@@ -292,10 +299,11 @@ def generate_response_with_tools(user_input, history_state):
|
|
292 |
print("No tool call requested. Final response received.")
|
293 |
final_bot_message = ""
|
294 |
code_parts_display = []
|
|
|
295 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
296 |
last_model_content = current_history_for_api[-1]
|
297 |
if last_model_content.parts:
|
298 |
-
for part in last_model_content.parts:
|
299 |
if hasattr(part, 'text') and part.text:
|
300 |
final_bot_message += part.text
|
301 |
if hasattr(part, 'executable_code') and part.executable_code:
|
@@ -303,6 +311,7 @@ def generate_response_with_tools(user_input, history_state):
|
|
303 |
code = getattr(part.executable_code, 'code', '')
|
304 |
code_parts_display.append(f"Suggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```")
|
305 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
|
|
306 |
outcome_enum = getattr(genai.types, 'ExecutableCodeResponse', None)
|
307 |
outcome_ok_val = getattr(outcome_enum.Outcome, 'OK', None) if outcome_enum and hasattr(outcome_enum, 'Outcome') else 1
|
308 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
@@ -315,11 +324,12 @@ def generate_response_with_tools(user_input, history_state):
|
|
315 |
|
316 |
if not final_bot_message.strip():
|
317 |
final_bot_message = "[Assistant completed its turn without generating text output.]"
|
|
|
318 |
if current_history_for_api[-1].role == "model" and not any(hasattr(p,'text') and p.text for p in current_history_for_api[-1].parts):
|
319 |
if not hasattr(current_history_for_api[-1], 'parts') or not current_history_for_api[-1].parts:
|
320 |
-
current_history_for_api[-1].parts = []
|
321 |
-
|
322 |
-
|
323 |
break
|
324 |
|
325 |
# End of while loop
|
@@ -327,22 +337,25 @@ def generate_response_with_tools(user_input, history_state):
|
|
327 |
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
|
328 |
warning_msg = f"\n\n[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
|
329 |
final_bot_message += warning_msg
|
|
|
330 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
331 |
if not hasattr(current_history_for_api[-1], 'parts') or not current_history_for_api[-1].parts:
|
332 |
current_history_for_api[-1].parts = []
|
333 |
-
|
|
|
334 |
|
335 |
print("--- Response Generation Complete ---")
|
336 |
|
337 |
# --- Format final output for Gradio Chatbot ---
|
338 |
chatbot_display_list = []
|
339 |
user_msg_buffer = None
|
340 |
-
for content in current_history_for_api:
|
341 |
if content.role == "system": continue
|
342 |
|
343 |
display_text = ""
|
344 |
if hasattr(content, 'parts') and content.parts:
|
345 |
-
for part in content.parts:
|
|
|
346 |
if hasattr(part, 'text') and part.text:
|
347 |
display_text += part.text + "\n"
|
348 |
elif hasattr(part, 'executable_code') and part.executable_code:
|
@@ -350,6 +363,7 @@ def generate_response_with_tools(user_input, history_state):
|
|
350 |
code = getattr(part.executable_code, 'code', '')
|
351 |
display_text += f"\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```\n"
|
352 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
|
|
353 |
outcome_enum = getattr(genai.types, 'ExecutableCodeResponse', None)
|
354 |
outcome_ok_val = getattr(outcome_enum.Outcome, 'OK', None) if outcome_enum and hasattr(outcome_enum, 'Outcome') else 1
|
355 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
@@ -369,6 +383,7 @@ def generate_response_with_tools(user_input, history_state):
|
|
369 |
user_msg_buffer = None
|
370 |
else:
|
371 |
chatbot_display_list.append([None, display_text])
|
|
|
372 |
|
373 |
if user_msg_buffer is not None:
|
374 |
chatbot_display_list.append([user_msg_buffer, None])
|
@@ -380,12 +395,14 @@ def generate_response_with_tools(user_input, history_state):
|
|
380 |
traceback.print_exc()
|
381 |
error_message = f"An error occurred: {str(e)}"
|
382 |
error_display_list = []
|
|
|
383 |
if isinstance(history_state, list):
|
384 |
temp_user_msg = None
|
385 |
-
for content in history_state:
|
386 |
if content.role == "system": continue
|
387 |
text = ""
|
388 |
if hasattr(content, 'parts') and content.parts:
|
|
|
389 |
text = "".join([p.text for p in content.parts if hasattr(p, 'text')])
|
390 |
if content.role == "user": temp_user_msg = text
|
391 |
elif content.role == "model" and temp_user_msg:
|
@@ -396,6 +413,7 @@ def generate_response_with_tools(user_input, history_state):
|
|
396 |
|
397 |
error_display_list.append([None, error_message])
|
398 |
|
|
|
399 |
previous_history = conversation_history[:-1] if isinstance(conversation_history, list) and conversation_history else []
|
400 |
return error_display_list, previous_history
|
401 |
|
@@ -408,7 +426,7 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
408 |
|
409 |
chatbot_display = gr.Chatbot(
|
410 |
label="Conversation",
|
411 |
-
bubble_full_width=False,
|
412 |
height=600,
|
413 |
show_copy_button=True,
|
414 |
render_markdown=True,
|
@@ -423,8 +441,9 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
423 |
)
|
424 |
with gr.Column(scale=1, min_width=150):
|
425 |
send_btn = gr.Button("➡️ Send", variant="primary")
|
426 |
-
clear_btn = gr.ClearButton(value="🗑️ Clear Chat")
|
427 |
|
|
|
428 |
chat_history_state = gr.State([])
|
429 |
|
430 |
def user_message_update(user_message, history_display_list):
|
@@ -435,10 +454,9 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
435 |
|
436 |
def bot_response_update(history_display_list, history_state):
|
437 |
"""Calls backend Gemini function and updates display/state."""
|
438 |
-
# Check if the last item in display list is a user message awaiting response
|
439 |
if not history_display_list or (len(history_display_list[-1]) > 1 and history_display_list[-1][1] is not None):
|
440 |
print("Bot update called without pending user message in display list.")
|
441 |
-
return history_display_list, history_state
|
442 |
|
443 |
user_message = history_display_list[-1][0]
|
444 |
print(f"User message being sent to backend: {user_message}")
|
@@ -455,8 +473,8 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
455 |
queue=False,
|
456 |
).then(
|
457 |
bot_response_update,
|
458 |
-
[chatbot_display, chat_history_state],
|
459 |
-
[chatbot_display, chat_history_state]
|
460 |
)
|
461 |
|
462 |
send_btn.click(
|
@@ -470,17 +488,14 @@ with gr.Blocks(title="Gemini AI Assistant w/ Tools", theme=gr.themes.Soft()) as
|
|
470 |
[chatbot_display, chat_history_state]
|
471 |
)
|
472 |
|
473 |
-
# Custom clear function
|
474 |
def clear_all():
|
475 |
-
# Returns values for outputs=[msg_input, chatbot_display, chat_history_state]
|
476 |
return ["", None, []]
|
477 |
|
478 |
-
# Wire clear button to the custom function
|
479 |
clear_btn.click(clear_all, [], [msg_input, chatbot_display, chat_history_state], queue=False)
|
480 |
|
481 |
|
482 |
if __name__ == "__main__":
|
483 |
print("Starting Gradio App...")
|
484 |
-
# show_error=True can be helpful for seeing Gradio errors in the browser console
|
485 |
demo.queue().launch(server_name="0.0.0.0", server_port=7860, show_error=True)
|
486 |
print("Gradio App Stopped.")
|
|
|
1 |
# -*- coding: utf-8 -*-
|
2 |
import os
|
3 |
import gradio as gr
|
4 |
+
# Import the main module and use an alias
|
5 |
import google.generativeai as genai
|
6 |
+
# Core types like Content/Part are accessed directly via 'genai'
|
7 |
+
# Other types like Tool, FunctionDeclaration are under genai.types
|
8 |
|
9 |
import requests
|
10 |
import markdownify
|
|
|
104 |
MODEL_NAME = "gemini-2.5-pro-exp-03-25"
|
105 |
print(f"Attempting to use EXPERIMENTAL model: {MODEL_NAME}")
|
106 |
|
107 |
+
# Use genai.types for Tool, FunctionDeclaration etc.
|
108 |
browse_tool = genai.types.Tool(
|
109 |
function_declarations=[
|
110 |
genai.types.FunctionDeclaration(
|
111 |
name='load_page',
|
112 |
description='Fetches the content of a specific web page URL as Markdown text. Use this when the user asks for information from a specific URL they provide, or when you need to look up live information mentioned alongside a specific source URL.',
|
|
|
113 |
parameters={
|
114 |
'type': 'object',
|
115 |
'properties': {
|
|
|
123 |
)
|
124 |
]
|
125 |
)
|
126 |
+
# Use genai.types.Tool, enable code execution with {}
|
|
|
127 |
code_execution_tool = genai.types.Tool(code_execution={})
|
|
|
128 |
|
129 |
tools = [browse_tool, code_execution_tool]
|
130 |
|
131 |
model = genai.GenerativeModel(
|
132 |
model_name=MODEL_NAME,
|
133 |
tools=tools,
|
134 |
+
# Use genai.types for HarmCategory and HarmBlockThreshold
|
135 |
safety_settings={
|
136 |
genai.types.HarmCategory.HARM_CATEGORY_HARASSMENT: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
137 |
genai.types.HarmCategory.HARM_CATEGORY_HATE_SPEECH: genai.types.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
|
|
|
145 |
except Exception as e:
|
146 |
print(f"CRITICAL ERROR: Error initializing Gemini client: {e}")
|
147 |
traceback.print_exc()
|
|
|
148 |
model = None
|
149 |
tools = []
|
150 |
|
|
|
154 |
def handle_function_call(function_call):
|
155 |
"""Executes the function call requested by the model."""
|
156 |
function_name = function_call.name
|
157 |
+
args = function_call.args
|
158 |
|
159 |
print(f"Executing Function Call: {function_name} with args: {dict(args)}")
|
160 |
|
|
|
163 |
url = args.get('url')
|
164 |
if url:
|
165 |
function_response_content = load_page(url=url)
|
166 |
+
MAX_RESPONSE_LEN = 50000
|
167 |
if len(function_response_content) > MAX_RESPONSE_LEN:
|
168 |
print(f"Tool Response truncated from {len(function_response_content)} to {MAX_RESPONSE_LEN} chars.")
|
169 |
function_response_content = function_response_content[:MAX_RESPONSE_LEN] + "\n\n[... Tool Response Truncated Due to Size Limit ...]"
|
|
|
173 |
print(f"Error: Received call for unknown function '{function_name}'")
|
174 |
function_response_content = f"Error: Unknown function '{function_name}' called by the model."
|
175 |
|
176 |
+
# *** CORRECTED: Use genai.Part and genai.types.FunctionResponse ***
|
177 |
+
function_response_part = genai.Part(
|
178 |
function_response=genai.types.FunctionResponse(
|
179 |
name=function_name,
|
180 |
+
response={'content': function_response_content}
|
181 |
)
|
182 |
)
|
183 |
print(f"Function Response generated for {function_name}")
|
|
|
186 |
except Exception as e:
|
187 |
print(f"Error during execution of function '{function_name}': {e}")
|
188 |
traceback.print_exc()
|
189 |
+
# *** CORRECTED: Use genai.Part and genai.types.FunctionResponse ***
|
190 |
+
return genai.Part(
|
191 |
function_response=genai.types.FunctionResponse(
|
192 |
name=function_name,
|
193 |
response={'error': f"Failed to execute function {function_name}: {str(e)}"}
|
|
|
204 |
|
205 |
# --- History Management ---
|
206 |
conversation_history = history_state if isinstance(history_state, list) else []
|
207 |
+
# *** CORRECTED: Use genai.Content and genai.Part ***
|
208 |
+
conversation_history.append(genai.Content(role="user", parts=[genai.Part.from_text(user_input)]))
|
209 |
print(f"\n--- Sending to Gemini (History length: {len(conversation_history)}) ---")
|
210 |
|
211 |
MAX_HISTORY_TURNS = 10
|
|
|
236 |
if not response.candidates:
|
237 |
print("Warning: No candidates received from Gemini.")
|
238 |
final_bot_message = "[No response generated by the model.]"
|
239 |
+
# *** CORRECTED: Use genai.Content and genai.Part ***
|
240 |
+
current_history_for_api.append(genai.Content(role="model", parts=[genai.Part.from_text(final_bot_message)]))
|
241 |
break
|
242 |
|
243 |
candidate = response.candidates[0]
|
244 |
+
# Use genai.types for Candidate fields
|
245 |
finish_reason = candidate.finish_reason
|
246 |
|
247 |
+
# Append model's response (Content object) to history
|
248 |
if candidate.content:
|
249 |
+
current_history_for_api.append(candidate.content) # content should already be a genai.Content object
|
250 |
|
251 |
+
# Check finish reason using genai.types.Candidate
|
252 |
if finish_reason not in (genai.types.Candidate.FinishReason.STOP, genai.types.Candidate.FinishReason.TOOL_CALL):
|
253 |
print(f"Warning: Generation stopped unexpectedly. Reason: {finish_reason.name}")
|
254 |
stop_reason_msg = f"[Model stopped generating. Reason: {finish_reason.name}]"
|
|
|
258 |
final_bot_message = (partial_text + "\n" if partial_text else "") + stop_reason_msg
|
259 |
break
|
260 |
|
261 |
+
# Use genai.types.Candidate for comparison
|
262 |
has_tool_call = finish_reason == genai.types.Candidate.FinishReason.TOOL_CALL
|
263 |
|
264 |
if has_tool_call:
|
265 |
print("Tool call requested by model.")
|
266 |
+
# Model response (genai.Content obj) is already appended above
|
267 |
if not candidate.content or not candidate.content.parts:
|
268 |
print("Error: TOOL_CALL indicated but candidate content/parts is missing.")
|
269 |
final_bot_message = "[Model indicated tool use but provided no details.]"
|
|
|
280 |
|
281 |
tool_responses = []
|
282 |
for func_call in function_calls:
|
283 |
+
# handle_function_call now returns a genai.Part object
|
284 |
function_response_part = handle_function_call(func_call)
|
285 |
tool_responses.append(function_response_part)
|
286 |
|
|
|
289 |
final_bot_message = "[Failed to process tool call request.]"
|
290 |
break
|
291 |
|
292 |
+
# Add tool responses to history using genai.Content
|
293 |
+
current_history_for_api.append(genai.Content(role="tool", parts=tool_responses)) # parts expects list of genai.Part
|
294 |
print("Added tool response(s) to history. Continuing loop...")
|
295 |
final_bot_message = ""
|
296 |
continue
|
|
|
299 |
print("No tool call requested. Final response received.")
|
300 |
final_bot_message = ""
|
301 |
code_parts_display = []
|
302 |
+
# Extract text/code from the last model turn (genai.Content object) in history
|
303 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
304 |
last_model_content = current_history_for_api[-1]
|
305 |
if last_model_content.parts:
|
306 |
+
for part in last_model_content.parts: # part is already a genai.Part object
|
307 |
if hasattr(part, 'text') and part.text:
|
308 |
final_bot_message += part.text
|
309 |
if hasattr(part, 'executable_code') and part.executable_code:
|
|
|
311 |
code = getattr(part.executable_code, 'code', '')
|
312 |
code_parts_display.append(f"Suggested Code ({lang}):\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```")
|
313 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
314 |
+
# outcome check uses genai.types
|
315 |
outcome_enum = getattr(genai.types, 'ExecutableCodeResponse', None)
|
316 |
outcome_ok_val = getattr(outcome_enum.Outcome, 'OK', None) if outcome_enum and hasattr(outcome_enum, 'Outcome') else 1
|
317 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
|
|
324 |
|
325 |
if not final_bot_message.strip():
|
326 |
final_bot_message = "[Assistant completed its turn without generating text output.]"
|
327 |
+
# Add this as a text Part to the last model Content object if it was otherwise empty
|
328 |
if current_history_for_api[-1].role == "model" and not any(hasattr(p,'text') and p.text for p in current_history_for_api[-1].parts):
|
329 |
if not hasattr(current_history_for_api[-1], 'parts') or not current_history_for_api[-1].parts:
|
330 |
+
current_history_for_api[-1].parts = []
|
331 |
+
# *** CORRECTED: Use genai.Part ***
|
332 |
+
current_history_for_api[-1].parts.append(genai.Part.from_text(final_bot_message))
|
333 |
break
|
334 |
|
335 |
# End of while loop
|
|
|
337 |
print(f"Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}).")
|
338 |
warning_msg = f"\n\n[Warning: Reached maximum tool execution loops ({MAX_TOOL_LOOPS}). The final response might be incomplete.]"
|
339 |
final_bot_message += warning_msg
|
340 |
+
# Append warning as a genai.Part to the last model message
|
341 |
if current_history_for_api and current_history_for_api[-1].role == "model":
|
342 |
if not hasattr(current_history_for_api[-1], 'parts') or not current_history_for_api[-1].parts:
|
343 |
current_history_for_api[-1].parts = []
|
344 |
+
# *** CORRECTED: Use genai.Part ***
|
345 |
+
current_history_for_api[-1].parts.append(genai.Part.from_text(warning_msg))
|
346 |
|
347 |
print("--- Response Generation Complete ---")
|
348 |
|
349 |
# --- Format final output for Gradio Chatbot ---
|
350 |
chatbot_display_list = []
|
351 |
user_msg_buffer = None
|
352 |
+
for content in current_history_for_api: # content is genai.Content object
|
353 |
if content.role == "system": continue
|
354 |
|
355 |
display_text = ""
|
356 |
if hasattr(content, 'parts') and content.parts:
|
357 |
+
for part in content.parts: # part is genai.Part object
|
358 |
+
# Extract text for display, format code etc.
|
359 |
if hasattr(part, 'text') and part.text:
|
360 |
display_text += part.text + "\n"
|
361 |
elif hasattr(part, 'executable_code') and part.executable_code:
|
|
|
363 |
code = getattr(part.executable_code, 'code', '')
|
364 |
display_text += f"\n```{'python' if lang == 'unknown_language' else lang}\n{code}\n```\n"
|
365 |
elif hasattr(part, 'code_execution_result') and part.code_execution_result:
|
366 |
+
# outcome check uses genai.types
|
367 |
outcome_enum = getattr(genai.types, 'ExecutableCodeResponse', None)
|
368 |
outcome_ok_val = getattr(outcome_enum.Outcome, 'OK', None) if outcome_enum and hasattr(outcome_enum, 'Outcome') else 1
|
369 |
outcome_val = getattr(part.code_execution_result, 'outcome', None)
|
|
|
383 |
user_msg_buffer = None
|
384 |
else:
|
385 |
chatbot_display_list.append([None, display_text])
|
386 |
+
# Ignore 'tool' role messages for chat display
|
387 |
|
388 |
if user_msg_buffer is not None:
|
389 |
chatbot_display_list.append([user_msg_buffer, None])
|
|
|
395 |
traceback.print_exc()
|
396 |
error_message = f"An error occurred: {str(e)}"
|
397 |
error_display_list = []
|
398 |
+
# Rebuild display from previous state (history_state)
|
399 |
if isinstance(history_state, list):
|
400 |
temp_user_msg = None
|
401 |
+
for content in history_state: # content is genai.Content
|
402 |
if content.role == "system": continue
|
403 |
text = ""
|
404 |
if hasattr(content, 'parts') and content.parts:
|
405 |
+
# parts contains genai.Part objects
|
406 |
text = "".join([p.text for p in content.parts if hasattr(p, 'text')])
|
407 |
if content.role == "user": temp_user_msg = text
|
408 |
elif content.role == "model" and temp_user_msg:
|
|
|
413 |
|
414 |
error_display_list.append([None, error_message])
|
415 |
|
416 |
+
# Revert to history *before* this failed turn
|
417 |
previous_history = conversation_history[:-1] if isinstance(conversation_history, list) and conversation_history else []
|
418 |
return error_display_list, previous_history
|
419 |
|
|
|
426 |
|
427 |
chatbot_display = gr.Chatbot(
|
428 |
label="Conversation",
|
429 |
+
bubble_full_width=False, # Keep param even if deprecated
|
430 |
height=600,
|
431 |
show_copy_button=True,
|
432 |
render_markdown=True,
|
|
|
441 |
)
|
442 |
with gr.Column(scale=1, min_width=150):
|
443 |
send_btn = gr.Button("➡️ Send", variant="primary")
|
444 |
+
clear_btn = gr.ClearButton(value="🗑️ Clear Chat")
|
445 |
|
446 |
+
# State stores list of genai.Content objects
|
447 |
chat_history_state = gr.State([])
|
448 |
|
449 |
def user_message_update(user_message, history_display_list):
|
|
|
454 |
|
455 |
def bot_response_update(history_display_list, history_state):
|
456 |
"""Calls backend Gemini function and updates display/state."""
|
|
|
457 |
if not history_display_list or (len(history_display_list[-1]) > 1 and history_display_list[-1][1] is not None):
|
458 |
print("Bot update called without pending user message in display list.")
|
459 |
+
return history_display_list, history_state
|
460 |
|
461 |
user_message = history_display_list[-1][0]
|
462 |
print(f"User message being sent to backend: {user_message}")
|
|
|
473 |
queue=False,
|
474 |
).then(
|
475 |
bot_response_update,
|
476 |
+
[chatbot_display, chat_history_state], # Pass display list and history state
|
477 |
+
[chatbot_display, chat_history_state] # Receive updated display list and state
|
478 |
)
|
479 |
|
480 |
send_btn.click(
|
|
|
488 |
[chatbot_display, chat_history_state]
|
489 |
)
|
490 |
|
491 |
+
# Custom clear function resets state
|
492 |
def clear_all():
|
|
|
493 |
return ["", None, []]
|
494 |
|
|
|
495 |
clear_btn.click(clear_all, [], [msg_input, chatbot_display, chat_history_state], queue=False)
|
496 |
|
497 |
|
498 |
if __name__ == "__main__":
|
499 |
print("Starting Gradio App...")
|
|
|
500 |
demo.queue().launch(server_name="0.0.0.0", server_port=7860, show_error=True)
|
501 |
print("Gradio App Stopped.")
|