Spaces:
Runtime error
Runtime error
meg-huggingface
Addressing the need to update the requests dataset to COMPLETED, FAILED, etc. based on model *and task*.
7de9b42
| import argparse | |
| import os | |
| from datasets import load_dataset, Dataset | |
| from huggingface_hub import HfApi | |
| TOKEN = os.environ.get("DEBUG") | |
| api = HfApi(token=TOKEN) | |
| REQUESTS_DSET = "AIEnergyScore/requests_debug" | |
| RESULTS_DSET = "AIEnergyScore/results_debug" | |
| PENDING = 'PENDING' | |
| COMPLETED = 'COMPLETED' | |
| FAILED = 'FAILED' | |
| def parse_args(): | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--run_dir", | |
| default="/runs", | |
| type=str, | |
| required=False, | |
| help="Path to the run directory.", | |
| ) | |
| parser.add_argument( | |
| "--attempts", | |
| default="/attempts.txt", | |
| type=str, | |
| required=False, | |
| help="File with per-line run attempt directories. Assumes format '/runs/{task}/{model}/{timestamp}'", | |
| ) | |
| parser.add_argument( | |
| "--failed_attempts", | |
| default="/failed_attempts.txt", | |
| type=str, | |
| required=False, | |
| help="File with per-line failed run directories. Assumes format '/runs/{task}/{model}/{timestamp}'", | |
| ) | |
| args = parser.parse_args() | |
| return args | |
| def check_for_traceback(run_dir): | |
| # run_dir="./runs/${experiment_name}/${backend_model}/${now}" | |
| found_error = False | |
| error_message = "" | |
| try: | |
| # Read error message | |
| with open(f"{run_dir}/error.log", 'r') as f: | |
| # There may be a better way to do this that finds the | |
| # index of Traceback, then prints from there : end-of-file index (the file length-1). | |
| for line in f: | |
| # Question: Do we even need to check for this? The presence of the | |
| # error file, or at least a non-empty one, | |
| # means there's been an error, no? | |
| if 'Traceback (most recent call last):' in line: | |
| found_error = True | |
| if found_error: | |
| error_message += line | |
| except FileNotFoundError as e: | |
| # When does this happen? | |
| print(f"Could not find {run_dir}/error.log") | |
| return error_message | |
| def update_requests(requests, all_attempts, failed_attempts): | |
| """ | |
| Sets All PENDING requests with the given model & task to 'COMPLETED' or 'FAILED.' | |
| Reads in the all_attempts text file and failed_attempts text file, in which | |
| each line is a run directory run_dir="/runs/${experiment_name}/${backend_model}/${now}" | |
| :param requests: requests Dataset | |
| :param all_attempts: text file of the run directories of each task/model/timestamp | |
| :param failed_attempts: text file of the run directories of each task/model/timestamp | |
| :return: | |
| """ | |
| requests_df = requests.to_pandas() | |
| # Each line is a run directory, where | |
| # run_dir="/runs/${experiment_name}/${backend_model}/${now}", where | |
| # ${backend_model} is ${organization}/${model_name} | |
| for line in all_attempts: | |
| line = line.strip() | |
| print(f"Checking {line}") | |
| split_run_dir = line.strip().strip("/").split("/") | |
| print(f"Processing run directory {split_run_dir}") | |
| task = split_run_dir[1] | |
| print(f"Task is {task}") | |
| # The naming of the optimum benchmark configs uses an underscore. | |
| # The naming of the HF Api list models function uses a hyphen. | |
| # We therefore need to adapt this task string name depending on | |
| # which part of our pipeline we're talking to. | |
| hyphenated_task_name = "-".join(task.split("_")) | |
| model = "/".join([split_run_dir[2], split_run_dir[3]]) | |
| print(f"Model is {model}") | |
| traceback_error = check_for_traceback(line) | |
| if traceback_error != "": | |
| print("Found a traceback error!") | |
| print(traceback_error) | |
| requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['status']] = FAILED | |
| requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['error_message']] = traceback_error | |
| elif line in failed_attempts: | |
| print(f"Job failed, but not sure why -- didn't find a traceback in {line}.") | |
| print(f"Setting {model}, {hyphenated_task_name}, status {PENDING} to {FAILED}.") | |
| print(requests_df[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name)]) | |
| requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['status']] = FAILED | |
| else: | |
| requests_df.loc[(requests_df["status"] == PENDING) & (requests_df["model"] == model) & (requests_df["task"] == hyphenated_task_name), ['status']] = COMPLETED | |
| updated_dset = Dataset.from_pandas(requests_df) | |
| return updated_dset | |
| if __name__ == '__main__': | |
| args = parse_args() | |
| # Uploads all run output to the results dataset. | |
| print(f"Uploading {args.run_dir} to {RESULTS_DSET}") | |
| api.upload_folder( | |
| folder_path=args.run_dir, | |
| repo_id=f"{RESULTS_DSET}", | |
| repo_type="dataset", | |
| ) | |
| # Update requests dataset based on whether things have failed or not. | |
| print(f"Examining the run directory for each model & task to determine if it {FAILED} or {COMPLETED}.") | |
| requests = load_dataset(f"{REQUESTS_DSET}", split="test", token=TOKEN) | |
| all_attempts = open(f"{args.attempts}", "r+").readlines() | |
| failed_attempts = open(f"{args.failed_attempts}", "r+").readlines() | |
| updated_requests = update_requests(requests, all_attempts, failed_attempts) | |
| print(f"Uploading updated {REQUESTS_DSET}.") | |
| updated_requests.push_to_hub(f"{REQUESTS_DSET}", split="test", token=TOKEN) | |
| print("Done.") | |