Spaces:
Runtime error
Runtime error
| # Copyright 2024 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import ast | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from typing import Dict | |
| from get_ci_error_statistics import get_jobs | |
| from notification_service import ( | |
| Message, | |
| handle_stacktraces, | |
| handle_test_results, | |
| prepare_reports, | |
| retrieve_artifact, | |
| retrieve_available_artifacts, | |
| ) | |
| from slack_sdk import WebClient | |
| client = WebClient(token=os.environ["CI_SLACK_BOT_TOKEN"]) | |
| class QuantizationMessage(Message): | |
| def __init__( | |
| self, | |
| title: str, | |
| results: Dict, | |
| ): | |
| self.title = title | |
| # Failures and success of the modeling tests | |
| self.n_success = sum(r["success"] for r in results.values()) | |
| self.single_gpu_failures = sum(r["failed"]["single"] for r in results.values()) | |
| self.multi_gpu_failures = sum(r["failed"]["multi"] for r in results.values()) | |
| self.n_failures = self.single_gpu_failures + self.multi_gpu_failures | |
| self.n_tests = self.n_failures + self.n_success | |
| self.results = results | |
| self.thread_ts = None | |
| def payload(self) -> str: | |
| blocks = [self.header] | |
| if self.n_failures > 0: | |
| blocks.append(self.failures_overwiew) | |
| blocks.append(self.failures_detailed) | |
| if self.n_failures == 0: | |
| blocks.append(self.no_failures) | |
| return json.dumps(blocks) | |
| def time(self) -> str: | |
| all_results = self.results.values() | |
| time_spent = [] | |
| for r in all_results: | |
| if len(r["time_spent"]): | |
| time_spent.extend([x for x in r["time_spent"].split(", ") if len(x.strip())]) | |
| total_secs = 0 | |
| for time in time_spent: | |
| time_parts = time.split(":") | |
| # Time can be formatted as xx:xx:xx, as .xx, or as x.xx if the time spent was less than a minute. | |
| if len(time_parts) == 1: | |
| time_parts = [0, 0, time_parts[0]] | |
| hours, minutes, seconds = int(time_parts[0]), int(time_parts[1]), float(time_parts[2]) | |
| total_secs += hours * 3600 + minutes * 60 + seconds | |
| hours, minutes, seconds = total_secs // 3600, (total_secs % 3600) // 60, total_secs % 60 | |
| return f"{int(hours)}h{int(minutes)}m{int(seconds)}s" | |
| def failures_overwiew(self) -> Dict: | |
| return { | |
| "type": "section", | |
| "text": { | |
| "type": "plain_text", | |
| "text": ( | |
| f"There were {self.n_failures} failures, out of {self.n_tests} tests.\n" | |
| f"The suite ran in {self.time}." | |
| ), | |
| "emoji": True, | |
| }, | |
| "accessory": { | |
| "type": "button", | |
| "text": {"type": "plain_text", "text": "Check Action results", "emoji": True}, | |
| "url": f"https://github.com/huggingface/transformers/actions/runs/{os.environ['GITHUB_RUN_ID']}", | |
| }, | |
| } | |
| def failures_detailed(self) -> Dict: | |
| failures = {k: v["failed"] for k, v in self.results.items()} | |
| individual_reports = [] | |
| for key, value in failures.items(): | |
| device_report = self.get_device_report(value) | |
| if sum(value.values()): | |
| report = f"{device_report}{key}" | |
| individual_reports.append(report) | |
| header = "Single | Multi | Category\n" | |
| failures_report = prepare_reports( | |
| title="The following quantization tests had failures", header=header, reports=individual_reports | |
| ) | |
| return {"type": "section", "text": {"type": "mrkdwn", "text": failures_report}} | |
| def post(self): | |
| payload = self.payload | |
| print("Sending the following payload") | |
| print(json.dumps({"blocks": json.loads(payload)})) | |
| text = f"{self.n_failures} failures out of {self.n_tests} tests," if self.n_failures else "All tests passed." | |
| self.thread_ts = client.chat_postMessage( | |
| channel=SLACK_REPORT_CHANNEL_ID, | |
| blocks=payload, | |
| text=text, | |
| ) | |
| def post_reply(self): | |
| if self.thread_ts is None: | |
| raise ValueError("Can only post reply if a post has been made.") | |
| for job, job_result in self.results.items(): | |
| if len(job_result["failures"]): | |
| for device, failures in job_result["failures"].items(): | |
| blocks = self.get_reply_blocks( | |
| job, | |
| job_result, | |
| failures, | |
| device, | |
| text=f'Number of failures: {job_result["failed"][device]}', | |
| ) | |
| print("Sending the following reply") | |
| print(json.dumps({"blocks": blocks})) | |
| client.chat_postMessage( | |
| channel="#transformers-ci-daily-quantization", | |
| text=f"Results for {job}", | |
| blocks=blocks, | |
| thread_ts=self.thread_ts["ts"], | |
| ) | |
| time.sleep(1) | |
| if __name__ == "__main__": | |
| setup_status = os.environ.get("SETUP_STATUS") | |
| SLACK_REPORT_CHANNEL_ID = os.environ["SLACK_REPORT_CHANNEL"] | |
| setup_failed = True if setup_status is not None and setup_status != "success" else False | |
| # This env. variable is set in workflow file (under the job `send_results`). | |
| ci_event = os.environ["CI_EVENT"] | |
| title = f"🤗 Results of the {ci_event} tests." | |
| if setup_failed: | |
| Message.error_out( | |
| title, ci_title="", runner_not_available=False, runner_failed=False, setup_failed=setup_failed | |
| ) | |
| exit(0) | |
| arguments = sys.argv[1:][0] | |
| try: | |
| quantization_matrix = ast.literal_eval(arguments) | |
| # Need to change from elements like `quantization/bnb` to `quantization_bnb` (the ones used as artifact names). | |
| quantization_matrix = [x.replace("quantization/", "quantization_") for x in quantization_matrix] | |
| except SyntaxError: | |
| Message.error_out(title, ci_title="") | |
| raise ValueError("Errored out.") | |
| available_artifacts = retrieve_available_artifacts() | |
| quantization_results = { | |
| quant: { | |
| "failed": {"single": 0, "multi": 0}, | |
| "success": 0, | |
| "time_spent": "", | |
| "failures": {}, | |
| "job_link": {}, | |
| } | |
| for quant in quantization_matrix | |
| if f"run_quantization_torch_gpu_{ quant }_test_reports" in available_artifacts | |
| } | |
| github_actions_jobs = get_jobs( | |
| workflow_run_id=os.environ["GITHUB_RUN_ID"], token=os.environ["ACCESS_REPO_INFO_TOKEN"] | |
| ) | |
| github_actions_job_links = {job["name"]: job["html_url"] for job in github_actions_jobs} | |
| artifact_name_to_job_map = {} | |
| for job in github_actions_jobs: | |
| for step in job["steps"]: | |
| if step["name"].startswith("Test suite reports artifacts: "): | |
| artifact_name = step["name"][len("Test suite reports artifacts: ") :] | |
| artifact_name_to_job_map[artifact_name] = job | |
| break | |
| for quant in quantization_results.keys(): | |
| for artifact_path in available_artifacts[f"run_quantization_torch_gpu_{ quant }_test_reports"].paths: | |
| artifact = retrieve_artifact(artifact_path["path"], artifact_path["gpu"]) | |
| if "stats" in artifact: | |
| # Link to the GitHub Action job | |
| job = artifact_name_to_job_map[artifact_path["path"]] | |
| quantization_results[quant]["job_link"][artifact_path["gpu"]] = job["html_url"] | |
| failed, success, time_spent = handle_test_results(artifact["stats"]) | |
| quantization_results[quant]["failed"][artifact_path["gpu"]] += failed | |
| quantization_results[quant]["success"] += success | |
| quantization_results[quant]["time_spent"] += time_spent[1:-1] + ", " | |
| stacktraces = handle_stacktraces(artifact["failures_line"]) | |
| for line in artifact["summary_short"].split("\n"): | |
| if line.startswith("FAILED "): | |
| line = line[len("FAILED ") :] | |
| line = line.split()[0].replace("\n", "") | |
| if artifact_path["gpu"] not in quantization_results[quant]["failures"]: | |
| quantization_results[quant]["failures"][artifact_path["gpu"]] = [] | |
| quantization_results[quant]["failures"][artifact_path["gpu"]].append( | |
| {"line": line, "trace": stacktraces.pop(0)} | |
| ) | |
| message = QuantizationMessage( | |
| title, | |
| results=quantization_results, | |
| ) | |
| message.post() | |
| message.post_reply() | |