Clémentine commited on
Commit
133c6d8
·
1 Parent(s): ab1227e

added a singleton-like class to manage all managers per session, plus session state management. Also fixes secret passing to the new leaderboard space

Browse files
yourbench_space/app.py CHANGED
@@ -3,6 +3,7 @@ import os
3
  import sys
4
  import time
5
  import gradio as gr
 
6
 
7
  from datasets import load_dataset
8
  from huggingface_hub import whoami
@@ -11,9 +12,7 @@ from pathlib import Path
11
 
12
  from yourbench_space.config import generate_and_save_config
13
  from yourbench_space.utils import (
14
- CONFIG_PATH,
15
- UPLOAD_DIRECTORY,
16
- SubprocessManager,
17
  save_files,
18
  update_dataset,
19
  STAGES,
@@ -30,14 +29,11 @@ Quickly create zero-shot benchmarks from your documents – keeping models accu
30
  - 💻 [GitHub](https://github.com/huggingface/yourbench/tree/v0.2-alpha-space)
31
  """
32
 
33
-
34
- UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)
35
-
36
  logger.remove()
37
  logger.add(sys.stderr, level="INFO")
38
 
39
- command = ["uv", "run", "yourbench", f"--config={CONFIG_PATH}"]
40
- manager = SubprocessManager(command)
41
 
42
  docs_path = Path(__file__).parent / "docs.md"
43
  citation_content = (
@@ -46,30 +42,27 @@ citation_content = (
46
  else "# Citation\n\nDocumentation file not found."
47
  )
48
 
49
-
50
- def generate_and_return(hf_org, hf_prefix):
51
- generate_and_save_config(hf_org, hf_prefix)
52
  for _ in range(5):
53
- if CONFIG_PATH.exists():
54
- break
55
  time.sleep(0.5)
56
-
 
 
 
 
57
  return (
58
- (
59
- "✅ Config saved!",
60
- gr.update(value=str(CONFIG_PATH), visible=True, interactive=True),
61
- )
62
- if CONFIG_PATH.exists()
63
- else (
64
- "❌ Config generation failed.",
65
- gr.update(visible=False, interactive=False),
66
- )
67
  )
68
 
69
  final_dataset = None
70
 
71
- def update_process_status():
72
  """Update process status and include exit details if process has terminated"""
 
 
73
  is_running = manager.is_running()
74
 
75
  if not is_running:
@@ -79,7 +72,8 @@ def update_process_status():
79
 
80
  return gr.update(value=True, label="Process Status: Running")
81
 
82
- def prepare_task(oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
 
83
  new_env = os.environ.copy()
84
  if oauth_token:
85
  new_env["HF_TOKEN"] = oauth_token.token
@@ -127,17 +121,22 @@ def run_evaluation_pipeline(oauth_token: gr.OAuthToken | None, org_name, eval_na
127
  api = HfApi()
128
 
129
  try:
130
- api.create_repo(repo_id=repo_id, repo_type="space", space_sdk="gradio")
131
- api.upload_folder(repo_id=repo_id, repo_type="space", folder_path="src/")
132
- api.add_space_secret(repo_id=repo_id, key="HF_TOKEN", value=HF_TOKEN)
133
- api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name)
134
- api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name)
135
  except Exception as e:
136
  status = "Evaluation" + status + "\nLeaderboard creation:" + e
137
  return status
138
 
139
 
140
  with gr.Blocks(theme=gr.themes.Default()) as app:
 
 
 
 
 
141
  gr.Markdown(project_description)
142
 
143
  with gr.Tabs() as tabs:
@@ -166,7 +165,7 @@ with gr.Blocks(theme=gr.themes.Default()) as app:
166
  )
167
  output = gr.Textbox(label="Log")
168
  file_input.upload(
169
- lambda files: save_files([file.name for file in files]),
170
  file_input,
171
  output,
172
  )
@@ -181,7 +180,7 @@ with gr.Blocks(theme=gr.themes.Default()) as app:
181
 
182
  preview_button.click(
183
  generate_and_return,
184
- inputs=[hf_org_dropdown, hf_dataset_name],
185
  outputs=[log_message, download_button],
186
  )
187
  preview_button.click(
@@ -193,13 +192,13 @@ with gr.Blocks(theme=gr.themes.Default()) as app:
193
  with gr.Tab("Run Generation", id=1):
194
  with gr.Row():
195
  start_button = gr.Button("Start Task")
196
- start_button.click(prepare_task, inputs=[login_btn, hf_dataset_name])
197
 
198
  stop_button = gr.Button("Stop Task")
199
- stop_button.click(manager.stop_process)
200
 
201
  kill_button = gr.Button("Kill Task")
202
- kill_button.click(manager.kill_process)
203
 
204
 
205
  with gr.Row():
@@ -209,7 +208,7 @@ with gr.Blocks(theme=gr.themes.Default()) as app:
209
 
210
  process_status = gr.Checkbox(label="Process Status", interactive=False)
211
  status_timer = gr.Timer(1.0, active=True)
212
- status_timer.tick(update_process_status, outputs=process_status)
213
 
214
  with gr.Column():
215
  with gr.Accordion("Stages", open=True):
@@ -238,7 +237,7 @@ with gr.Blocks(theme=gr.themes.Default()) as app:
238
 
239
  log_timer = gr.Timer(1.0, active=True)
240
  log_timer.tick(
241
- manager.read_and_get_output, outputs=[log_output, stages_table]
242
  )
243
  with gr.Tab("Evaluate", id=2):
244
  with gr.Row():
 
3
  import sys
4
  import time
5
  import gradio as gr
6
+ import uuid
7
 
8
  from datasets import load_dataset
9
  from huggingface_hub import whoami
 
12
 
13
  from yourbench_space.config import generate_and_save_config
14
  from yourbench_space.utils import (
15
+ SubprocessManagerGroup,
 
 
16
  save_files,
17
  update_dataset,
18
  STAGES,
 
29
  - 💻 [GitHub](https://github.com/huggingface/yourbench/tree/v0.2-alpha-space)
30
  """
31
 
 
 
 
32
  logger.remove()
33
  logger.add(sys.stderr, level="INFO")
34
 
35
+ # Global to store all managers per session
36
+ MANAGERS = SubprocessManagerGroup()
37
 
38
  docs_path = Path(__file__).parent / "docs.md"
39
  citation_content = (
 
42
  else "# Citation\n\nDocumentation file not found."
43
  )
44
 
45
+ def generate_and_return(hf_org, hf_dataset_name, session_state: gr.State):
46
+ manager = MANAGERS.get(session_state.value)
47
+ config_path = generate_and_save_config(hf_org, hf_dataset_name, session_state.value, manager.config_path)
48
  for _ in range(5):
 
 
49
  time.sleep(0.5)
50
+ if config_path.exists():
51
+ return (
52
+ "✅ Config saved!",
53
+ gr.update(value=str(config_path), visible=True, interactive=True),
54
+ )
55
  return (
56
+ "❌ Config generation failed.",
57
+ gr.update(visible=False, interactive=False),
 
 
 
 
 
 
 
58
  )
59
 
60
  final_dataset = None
61
 
62
+ def update_process_status(session_state: gr.State):
63
  """Update process status and include exit details if process has terminated"""
64
+ manager = MANAGERS.get(session_state.value)
65
+
66
  is_running = manager.is_running()
67
 
68
  if not is_running:
 
72
 
73
  return gr.update(value=True, label="Process Status: Running")
74
 
75
+ def prepare_task(session_state: gr.State, oauth_token: gr.OAuthToken | None, hf_dataset_name: str, _=None):
76
+ manager = MANAGERS.get(session_state.value)
77
  new_env = os.environ.copy()
78
  if oauth_token:
79
  new_env["HF_TOKEN"] = oauth_token.token
 
121
  api = HfApi()
122
 
123
  try:
124
+ api.create_repo(repo_id=repo_id, repo_type="space", space_sdk="gradio", token=oauth_token.token)
125
+ api.upload_folder(repo_id=repo_id, repo_type="space", folder_path="src/", token=oauth_token.token)
126
+ api.add_space_secret(repo_id=repo_id, key="HF_TOKEN", value=oauth_token.token, token=oauth_token.token)
127
+ api.add_space_variable(repo_id=repo_id, key="TASK", value=eval_ds_name, token=oauth_token.token)
128
+ api.add_space_variable(repo_id=repo_id, key="ORG_NAME", value=org_name, token=oauth_token.token)
129
  except Exception as e:
130
  status = "Evaluation" + status + "\nLeaderboard creation:" + e
131
  return status
132
 
133
 
134
  with gr.Blocks(theme=gr.themes.Default()) as app:
135
+ # We initialize the session state with the user randomly generated uuid
136
+ # Using uuid4 makes collision cases extremely unlikely even for concurrent users
137
+ session_state = gr.State(uuid.uuid4(), delete_callback=lambda uid: MANAGERS.remove(uid))
138
+ MANAGERS.create(session_state.value)
139
+
140
  gr.Markdown(project_description)
141
 
142
  with gr.Tabs() as tabs:
 
165
  )
166
  output = gr.Textbox(label="Log")
167
  file_input.upload(
168
+ lambda files: save_files(session_state, [file.name for file in files]),
169
  file_input,
170
  output,
171
  )
 
180
 
181
  preview_button.click(
182
  generate_and_return,
183
+ inputs=[hf_org_dropdown, hf_dataset_name, session_state],
184
  outputs=[log_message, download_button],
185
  )
186
  preview_button.click(
 
192
  with gr.Tab("Run Generation", id=1):
193
  with gr.Row():
194
  start_button = gr.Button("Start Task")
195
+ start_button.click(prepare_task, inputs=[session_state, login_btn, hf_dataset_name])
196
 
197
  stop_button = gr.Button("Stop Task")
198
+ stop_button.click(MANAGERS.stop_process, inputs=session_state)
199
 
200
  kill_button = gr.Button("Kill Task")
201
+ kill_button.click(MANAGERS.kill_process, inputs=session_state)
202
 
203
 
204
  with gr.Row():
 
208
 
209
  process_status = gr.Checkbox(label="Process Status", interactive=False)
210
  status_timer = gr.Timer(1.0, active=True)
211
+ status_timer.tick(update_process_status, inputs=session_state, outputs=process_status)
212
 
213
  with gr.Column():
214
  with gr.Accordion("Stages", open=True):
 
237
 
238
  log_timer = gr.Timer(1.0, active=True)
239
  log_timer.tick(
240
+ MANAGERS.read_and_get_output, inputs=session_state, outputs=[log_output, stages_table]
241
  )
242
  with gr.Tab("Evaluate", id=2):
243
  with gr.Row():
yourbench_space/config.py CHANGED
@@ -3,14 +3,14 @@ from loguru import logger
3
  from yourbench_space.utils import CONFIG_PATH
4
 
5
 
6
- def generate_base_config(hf_org, hf_prefix):
7
  """Creates the base config dictionary"""
8
  return {
9
  "hf_configuration": {
10
  "token": "$HF_TOKEN",
11
  "private": True,
12
  "hf_organization": hf_org,
13
- "hf_dataset_name": hf_prefix,
14
  },
15
  "model_list": [
16
  {
@@ -34,12 +34,12 @@ def generate_base_config(hf_org, hf_prefix):
34
  },
35
  "pipeline": {
36
  "ingestion": {
37
- "source_documents_dir": "/app/uploaded_files",
38
- "output_dir": "/app/ingested",
39
  "run": True,
40
  },
41
  "upload_ingest_to_hub": {
42
- "source_documents_dir": "/app/ingested",
43
  "run": True,
44
  },
45
  "summarization": {"run": True},
@@ -84,18 +84,18 @@ def generate_base_config(hf_org, hf_prefix):
84
  }
85
 
86
 
87
- def save_yaml_file(config):
88
  """Saves the given config dictionary to a YAML file"""
89
- with open(CONFIG_PATH, "w") as file:
90
  yaml.dump(config, file, default_flow_style=False, sort_keys=False)
91
- return CONFIG_PATH
92
 
93
 
94
- def generate_and_save_config(hf_org, hf_prefix):
95
  """Generates and saves the YAML configuration file"""
96
- logger.debug(f"Generating config with org: {hf_org}, prefix: {hf_prefix}")
97
- config = generate_base_config(hf_org, hf_prefix)
98
- file_path = save_yaml_file(config)
99
  logger.success(f"Config saved at: {file_path}")
100
  return file_path
101
 
 
3
  from yourbench_space.utils import CONFIG_PATH
4
 
5
 
6
+ def generate_base_config(hf_org: str, hf_dataset_name: str, session_uid: str):
7
  """Creates the base config dictionary"""
8
  return {
9
  "hf_configuration": {
10
  "token": "$HF_TOKEN",
11
  "private": True,
12
  "hf_organization": hf_org,
13
+ "hf_dataset_name": hf_dataset_name,
14
  },
15
  "model_list": [
16
  {
 
34
  },
35
  "pipeline": {
36
  "ingestion": {
37
+ "source_documents_dir": f"/app/{session_uid}/uploaded_files/",
38
+ "output_dir": f"/app/{session_uid}/ingested",
39
  "run": True,
40
  },
41
  "upload_ingest_to_hub": {
42
+ "source_documents_dir": f"/app/{session_uid}/ingested",
43
  "run": True,
44
  },
45
  "summarization": {"run": True},
 
84
  }
85
 
86
 
87
+ def save_yaml_file(config: str, path: str):
88
  """Saves the given config dictionary to a YAML file"""
89
+ with open(path, "w") as file:
90
  yaml.dump(config, file, default_flow_style=False, sort_keys=False)
91
+ return path
92
 
93
 
94
+ def generate_and_save_config(hf_org: str, hf_name: str, session_uid: str, config_path: str):
95
  """Generates and saves the YAML configuration file"""
96
+ logger.debug(f"Generating config with org: {hf_org}, dataset name: {hf_name}")
97
+ config = generate_base_config(hf_org, hf_name, session_uid)
98
+ file_path = save_yaml_file(config, config_path)
99
  logger.success(f"Config saved at: {file_path}")
100
  return file_path
101
 
yourbench_space/utils.py CHANGED
@@ -4,16 +4,12 @@ import re
4
  import pathlib
5
  import shutil
6
  import subprocess
 
7
  import pandas as pd
8
- from datasets import load_dataset, get_dataset_config_names
 
9
  from loguru import logger
10
- from typing import List
11
-
12
- UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files")
13
- CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml")
14
-
15
- # Ensure the upload directory exists
16
- UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)
17
 
18
  STAGES = [
19
  "ingestion",
@@ -28,14 +24,18 @@ STAGES = [
28
  ]
29
 
30
 
31
- def save_files(files: List[pathlib.Path]) -> str:
32
- """Save uploaded files to the UPLOAD_DIRECTORY safely"""
 
33
  saved_paths = []
34
 
35
  for file in files:
36
  try:
37
  source_path = pathlib.Path(file)
38
- destination_path = UPLOAD_DIRECTORY / source_path.name
 
 
 
39
 
40
  if not source_path.exists():
41
  print(f"File not found: {source_path}")
@@ -65,13 +65,10 @@ def update_dataset(stages, hf_org, hf_prefix):
65
  # Construct dataset name from config
66
  dataset_name = f"{hf_org}/{hf_prefix}"
67
 
68
- # TODO: add cache dir
69
- # Will be able to group everything in one pass once the names get homogeneized
70
- # TODO: make sure the questions are loaded with a set
71
  if "ingestion" in stages:
72
  # TODO: why is the key "ingested" and not "ingestion"? (does not match the other splits)
73
  ingestion_ds = load_dataset(dataset_name, name="ingested", split="train").select_columns("document_text")
74
- ingestion_df = pd.DataFrame([next(iter(ingestion_ds)) for _ in range(5)])
75
  if "summarization" in stages:
76
  summarization_ds = load_dataset(dataset_name, name="summarization", split="train", streaming=True).select_columns(['raw_document_summary', 'document_summary', 'summarization_model'])
77
  summarization_df = pd.DataFrame([next(iter(summarization_ds)) for _ in range(5)])
@@ -84,9 +81,55 @@ def update_dataset(stages, hf_org, hf_prefix):
84
 
85
  return (ingestion_df, summarization_df, single_hop_df, answers_df)
86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  class SubprocessManager:
88
- def __init__(self, command):
89
- self.command = command
 
 
 
 
90
  self.process = None
91
  self.output_stream = io.StringIO()
92
  self.exit_code = None
 
4
  import pathlib
5
  import shutil
6
  import subprocess
7
+ import gradio as gr
8
  import pandas as pd
9
+ from collections import defaultdict
10
+ from datasets import load_dataset
11
  from loguru import logger
12
+ from typing import List, Union
 
 
 
 
 
 
13
 
14
  STAGES = [
15
  "ingestion",
 
24
  ]
25
 
26
 
27
+ def save_files(session_state: gr.State, files: List[pathlib.Path]) -> str:
28
+ """Save uploaded files to the UPLOAD_DIRECTORY/uuid safely"""
29
+ uuid = session_state.value
30
  saved_paths = []
31
 
32
  for file in files:
33
  try:
34
  source_path = pathlib.Path(file)
35
+ upload_directory_uuid = pathlib.Path(f"/app/{uuid}/uploaded_files")
36
+ # Ensure the upload directory exists
37
+ upload_directory_uuid.mkdir(parents=True, exist_ok=True)
38
+ destination_path = upload_directory_uuid / source_path.name
39
 
40
  if not source_path.exists():
41
  print(f"File not found: {source_path}")
 
65
  # Construct dataset name from config
66
  dataset_name = f"{hf_org}/{hf_prefix}"
67
 
 
 
 
68
  if "ingestion" in stages:
69
  # TODO: why is the key "ingested" and not "ingestion"? (does not match the other splits)
70
  ingestion_ds = load_dataset(dataset_name, name="ingested", split="train").select_columns("document_text")
71
+ ingestion_df = pd.DataFrame(ingestion_ds[0]) # only one row
72
  if "summarization" in stages:
73
  summarization_ds = load_dataset(dataset_name, name="summarization", split="train", streaming=True).select_columns(['raw_document_summary', 'document_summary', 'summarization_model'])
74
  summarization_df = pd.DataFrame([next(iter(summarization_ds)) for _ in range(5)])
 
81
 
82
  return (ingestion_df, summarization_df, single_hop_df, answers_df)
83
 
84
+
85
+ class SubprocessManagerGroup:
86
+ """Instanciates one manager per user (should be used as a singleton class)"""
87
+ def __init__(self):
88
+ self.managers: dict[str, SubprocessManager] = {}
89
+
90
+ @staticmethod
91
+ def grab_uuid(uid: Union[str, gr.State]):
92
+ """If a gradio session state is provided, we pull the uuid from its value - else we assume the str is the uuid"""
93
+ if isinstance(uid, gr.State):
94
+ uid = uid.value
95
+ return uid
96
+
97
+ def create(self, uid: Union[str, gr.State]):
98
+ uid = SubprocessManagerGroup.grab_uuid(uid)
99
+ self.managers[uid] = SubprocessManager(uid)
100
+
101
+ def get(self, uid: Union[str, gr.State]) -> "SubprocessManager":
102
+ uid = SubprocessManagerGroup.grab_uuid(uid)
103
+ return self.managers[uid]
104
+
105
+ def remove(self, uid: Union[str, gr.State]):
106
+ uid = SubprocessManagerGroup.grab_uuid(uid)
107
+ del self.managers[uid]
108
+
109
+ def start_process(self, uid: Union[str, gr.State]):
110
+ uid = SubprocessManagerGroup.grab_uuid(uid)
111
+ self.managers[uid].start_process()
112
+
113
+ def stop_process(self, uid: Union[str, gr.State]):
114
+ uid = SubprocessManagerGroup.grab_uuid(uid)
115
+ self.managers[uid].stop_process()
116
+
117
+ def kill_process(self, uid: Union[str, gr.State]):
118
+ uid = SubprocessManagerGroup.grab_uuid(uid)
119
+ self.managers[uid].kill_process()
120
+
121
+ def read_and_get_output(self, uid: Union[str, gr.State]):
122
+ uid = SubprocessManagerGroup.grab_uuid(uid)
123
+ self.managers[uid].read_and_get_output()
124
+
125
+
126
  class SubprocessManager:
127
+ def __init__(self, session_uid: str):
128
+ self.session_uid = session_uid
129
+ self.path = pathlib.Path(f"/app/{session_uid}")
130
+ self.path.mkdir(parents=True, exist_ok=True)
131
+ self.config_path = pathlib.Path(f"/app/{session_uid}/config.yml")
132
+ self.command = ["uv", "run", "yourbench", f"--config", self.config_path]
133
  self.process = None
134
  self.output_stream = io.StringIO()
135
  self.exit_code = None