Alina Lozovskaya commited on
Commit
50871c5
·
1 Parent(s): 9e36858

Improve session management and require auth

Browse files
Files changed (2) hide show
  1. yourbench_space/app.py +32 -3
  2. yourbench_space/utils.py +26 -4
yourbench_space/app.py CHANGED
@@ -16,6 +16,7 @@ from yourbench_space.utils import (
16
  save_files,
17
  update_dataset,
18
  STAGES,
 
19
  )
20
  from yourbench_space.evaluation import create_eval_file, run_evaluations
21
  from yourbench_space.leaderboard_space.env import HF_TOKEN
@@ -34,6 +35,8 @@ logger.add(sys.stderr, level="INFO")
34
 
35
  # Global to store all managers per session
36
  MANAGERS = SubprocessManagerGroup()
 
 
37
 
38
  docs_path = Path(__file__).parent / "docs.md"
39
  citation_content = (
@@ -44,6 +47,12 @@ citation_content = (
44
 
45
  def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
46
  manager = MANAGERS.get(session_state)
 
 
 
 
 
 
47
  session_uid = session_state.value
48
  config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path)
49
  for _ in range(5):
@@ -64,7 +73,10 @@ def update_process_status(session_state: gr.State):
64
  """Update process status and include exit details if process has terminated"""
65
  if session_state is None:
66
  return gr.update(value=False, label="Not running")
 
67
  manager = MANAGERS.get(session_state.value)
 
 
68
 
69
  is_running = manager.is_running()
70
 
@@ -76,9 +88,14 @@ def update_process_status(session_state: gr.State):
76
  return gr.update(value=True, label="Process Status: Running")
77
 
78
  def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
 
 
 
79
  new_env = os.environ.copy()
 
80
  if oauth_token:
81
  new_env["HF_TOKEN"] = oauth_token.token
 
82
  new_env["DATASET_PREFIX"] = hf_dataset_name
83
  MANAGERS.start_process(session_uid, custom_env=new_env)
84
 
@@ -133,9 +150,21 @@ def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_na
133
  return status
134
 
135
 
136
- def init_session():
137
  """Update session on load"""
138
- local_uuid = str(uuid.uuid4())
 
 
 
 
 
 
 
 
 
 
 
 
139
  MANAGERS.create(local_uuid)
140
  logger.info(f"Started session for {local_uuid}")
141
  return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
@@ -249,7 +278,7 @@ with gr.Blocks(theme=gr.themes.Default()) as app:
249
  log_timer.tick(
250
  MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table]
251
  )
252
- with gr.Tab("Evaluate", id=2):
253
  with gr.Row():
254
  btn_launch_evals = gr.Button("Launch evaluations")
255
  status = gr.Textbox(label="Status")
 
16
  save_files,
17
  update_dataset,
18
  STAGES,
19
+ is_running_locally
20
  )
21
  from yourbench_space.evaluation import create_eval_file, run_evaluations
22
  from yourbench_space.leaderboard_space.env import HF_TOKEN
 
35
 
36
  # Global to store all managers per session
37
  MANAGERS = SubprocessManagerGroup()
38
+ USER_ID_SESSION_MAP: dict[str, str] = dict()
39
+
40
 
41
  docs_path = Path(__file__).parent / "docs.md"
42
  citation_content = (
 
47
 
48
  def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
49
  manager = MANAGERS.get(session_state)
50
+ if manager is None: # should not be possible
51
+ return (
52
+ "❌ Config generation failed.",
53
+ gr.update(visible=False, interactive=False),
54
+ )
55
+
56
  session_uid = session_state.value
57
  config_path = generate_and_save_config(hf_org, hf_dataset_name, session_uid, manager.config_path)
58
  for _ in range(5):
 
73
  """Update process status and include exit details if process has terminated"""
74
  if session_state is None:
75
  return gr.update(value=False, label="Not running")
76
+
77
  manager = MANAGERS.get(session_state.value)
78
+ if manager is None:
79
+ return gr.update(value=False, label="Not running")
80
 
81
  is_running = manager.is_running()
82
 
 
88
  return gr.update(value=True, label="Process Status: Running")
89
 
90
  def prepare_task(session_uid: str, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
91
+ if oauth_token is None and not is_running_locally():
92
+ gr.Warning('You need to log in to use this Space')
93
+ return
94
  new_env = os.environ.copy()
95
+
96
  if oauth_token:
97
  new_env["HF_TOKEN"] = oauth_token.token
98
+
99
  new_env["DATASET_PREFIX"] = hf_dataset_name
100
  MANAGERS.start_process(session_uid, custom_env=new_env)
101
 
 
150
  return status
151
 
152
 
153
+ def init_session(profile: gr.OAuthProfile | None):
154
  """Update session on load"""
155
+ if profile is None: # if we running localy or user is not logged in
156
+ local_uuid = "local" if is_running_locally() else str(uuid.uuid4())
157
+ else:
158
+ local_uuid = USER_ID_SESSION_MAP.get(profile.username, str(uuid.uuid4()))
159
+
160
+ manager = MANAGERS.get(local_uuid)
161
+ if manager and manager.is_running():
162
+ logger.info(f"Found existing, running session for {local_uuid}")
163
+ return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
164
+
165
+ if profile:
166
+ USER_ID_SESSION_MAP[profile.username] = local_uuid
167
+
168
  MANAGERS.create(local_uuid)
169
  logger.info(f"Started session for {local_uuid}")
170
  return gr.State(local_uuid, delete_callback=lambda uid: MANAGERS.remove(uid))
 
278
  log_timer.tick(
279
  MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table]
280
  )
281
+ with gr.Tab("Evaluate", id=2, visible=False):
282
  with gr.Row():
283
  btn_launch_evals = gr.Button("Launch evaluations")
284
  status = gr.Textbox(label="Status")
yourbench_space/utils.py CHANGED
@@ -9,7 +9,7 @@ import pandas as pd
9
  from collections import defaultdict
10
  from datasets import load_dataset
11
  from loguru import logger
12
- from typing import List, Union
13
 
14
  STAGES = [
15
  "ingestion",
@@ -23,9 +23,19 @@ STAGES = [
23
  # "judge_answers", # to uncomment when fixed
24
  ]
25
 
 
 
 
 
 
26
 
27
- def save_files(session_state: gr.State, files: List[pathlib.Path]) -> str:
 
28
  """Save uploaded files to the UPLOAD_DIRECTORY/uuid safely"""
 
 
 
 
29
  saved_paths = []
30
 
31
  for file in [file.name for file in files]:
@@ -97,12 +107,14 @@ class SubprocessManagerGroup:
97
  uid = SubprocessManagerGroup.grab_uuid(uid)
98
  self.managers[uid] = SubprocessManager(uid)
99
 
100
- def get(self, uid: Union[str, gr.State]) -> "SubprocessManager":
101
  uid = SubprocessManagerGroup.grab_uuid(uid)
102
- return self.managers[uid]
103
 
104
  def remove(self, uid: Union[str, gr.State]):
105
  uid = SubprocessManagerGroup.grab_uuid(uid)
 
 
106
  del self.managers[uid]
107
 
108
  def start_process(self, uid: Union[str, gr.State], custom_env: dict | None):
@@ -123,6 +135,11 @@ class SubprocessManagerGroup:
123
  uid = SubprocessManagerGroup.grab_uuid(uid)
124
  return self.managers[uid].read_and_get_output()
125
 
 
 
 
 
 
126
 
127
  class SubprocessManager:
128
  def __init__(self, session_uid: str):
@@ -228,3 +245,8 @@ class SubprocessManager:
228
  return self.exit_code, "Process exited abnormaly"
229
 
230
  return self.exit_code, "Process exited normaly"
 
 
 
 
 
 
9
  from collections import defaultdict
10
  from datasets import load_dataset
11
  from loguru import logger
12
+ from typing import List, Union, Optional
13
 
14
  STAGES = [
15
  "ingestion",
 
23
  # "judge_answers", # to uncomment when fixed
24
  ]
25
 
26
+ def is_running_locally() -> bool:
27
+ """
28
+ Returns True if Gradio is running locally, False if it's running in a Hugging Face Space.
29
+ """
30
+ return os.getenv("SPACE_ID") is None # SPACE_ID is set in Hugging Face Spaces
31
 
32
+
33
+ def save_files(oauth_token: gr.OAuthToken | None, session_state: gr.State, files: List[pathlib.Path]) -> str:
34
  """Save uploaded files to the UPLOAD_DIRECTORY/uuid safely"""
35
+ if oauth_token is None and not is_running_locally():
36
+ gr.Warning('You need to log in to use this Space')
37
+ return
38
+
39
  saved_paths = []
40
 
41
  for file in [file.name for file in files]:
 
107
  uid = SubprocessManagerGroup.grab_uuid(uid)
108
  self.managers[uid] = SubprocessManager(uid)
109
 
110
+ def get(self, uid: Union[str, "gr.State"]) -> Optional["SubprocessManager"]:
111
  uid = SubprocessManagerGroup.grab_uuid(uid)
112
+ return self.managers.get(uid)
113
 
114
  def remove(self, uid: Union[str, gr.State]):
115
  uid = SubprocessManagerGroup.grab_uuid(uid)
116
+ if manager := self.managers.get(uid):
117
+ manager.stop_process()
118
  del self.managers[uid]
119
 
120
  def start_process(self, uid: Union[str, gr.State], custom_env: dict | None):
 
135
  uid = SubprocessManagerGroup.grab_uuid(uid)
136
  return self.managers[uid].read_and_get_output()
137
 
138
+ def is_running(self, uid: Union[str, gr.State]) -> bool:
139
+ uid = SubprocessManagerGroup.grab_uuid(uid)
140
+ if manager := self.managers.get(uid):
141
+ return manager.is_running()
142
+ return False
143
 
144
  class SubprocessManager:
145
  def __init__(self, session_uid: str):
 
245
  return self.exit_code, "Process exited abnormaly"
246
 
247
  return self.exit_code, "Process exited normaly"
248
+
249
+ def __del__(self):
250
+ """Stop the process when object is deleted"""
251
+ if self.process:
252
+ self.process.kill()