import os
import pandas as pd
import gradio as gr
import datetime
from pathlib import Path
import json
from risk_atlas_nexus.blocks.inference import WMLInferenceEngine
from risk_atlas_nexus.blocks.inference.params import WMLInferenceEngineParams
from risk_atlas_nexus.library import RiskAtlasNexus
from functools import lru_cache, wraps
from dotenv import load_dotenv
load_dotenv(override=True)
# Load the taxonomies
ran = RiskAtlasNexus() # type: ignore
def clear_previous_risks():
return gr.Markdown("""
Potential Risks
"""), [], gr.Dataset(samples=[],
sample_labels=[],
samples_per_page=50, visible=False), gr.DownloadButton("Download JSON", visible=False, ), "", gr.Dataset(samples=[], sample_labels=[], visible=False), gr.DataFrame([], wrap=True, show_copy_button=True, show_search="search", visible=False), gr.DataFrame([], wrap=True, show_copy_button=True, show_search="search", visible=False), gr.Markdown(" "), gr.Markdown(" "),
def clear_previous_mitigations():
return "", gr.Dataset(samples=[], sample_labels=[], visible=False), gr.DataFrame([], wrap=True, show_copy_button=True, show_search="search", visible=False), gr.DataFrame([], wrap=True, show_copy_button=True, show_search="search", visible=False), gr.Markdown(" "), gr.Markdown(" ")
def generate_subgraph(risk):
lines =[f'```mermaid\n', '---\n'
'config:\n'
' layout: "tidy-tree"\n'
'---\n',
'mindmap\n']
lines.append(f' root(("{risk.name}"))\n')
# origin info
lines.append(f' Origins\n')
lines.append(f' Riskgroup: {risk.isPartOf}\n')
lines.append(f' Taxonomy: {risk.isDefinedByTaxonomy}\n')
# add related risks
rrs = ran.get_related_risks(id=risk.id)
if len(rrs) > 0:
lines.append(f' Related Risks\n')
for rr in rrs:
lines.append(f' {rr.name}\n')
# add related evals
revals = ran.get_related_evaluations(risk_id=risk.id)
if len(revals) > 0:
lines.append(f' Related AI evaluations\n')
for reval in revals:
lines.append(f' {reval.name}\n')
# add related mitigations
rmits = get_controls_and_actions(risk.id, risk.isDefinedByTaxonomy)
if len(rmits) > 0:
lines.append(f' Related mitigations\n')
for rmit in rmits:
lines.append(f' {rmit}\n')
lines.append(f"```")
diagram_string = "".join(lines)
return gr.Markdown(value = diagram_string)
def custom_lru_cache(maxsize=128, exclude_values=(None,[],[[]])):
"""
Make the LRU cache not cache result when empty result was returned
"""
def decorator(func):
cached_func = lru_cache(maxsize=maxsize)(func)
@wraps(func)
def wrapper(*args, **kwargs):
result = cached_func(*args, **kwargs)
# check for empty df of risks
if result[2].constructor_args["samples"] in exclude_values:
return func(*args, **kwargs)
return result
return wrapper
return decorator
@custom_lru_cache(exclude_values=(None, []))
def risk_identifier(usecase: str,
model_name_or_path: str = "meta-llama/llama-3-3-70b-instruct",
taxonomy: str = "ibm-risk-atlas"): # -> List[Dict[str, Any]]: #pd.DataFrame:
downloadable = False
inference_engine = WMLInferenceEngine(
model_name_or_path= model_name_or_path,
credentials={
"api_key": os.environ["WML_API_KEY"],
"api_url": os.environ["WML_API_URL"],
"project_id": os.environ["WML_PROJECT_ID"],
},
parameters=WMLInferenceEngineParams(
max_new_tokens=1000, decoding_method="greedy", repetition_penalty=1
), # type: ignore
)
risks_a = ran.identify_risks_from_usecases(# type: ignore
usecases=[usecase],
inference_engine=inference_engine,
taxonomy=taxonomy,
zero_shot_only=True,
max_risk=5
)
risks = risks_a[0]
sample_labels = [r.name if r else r.id for r in risks]
out_sec = gr.Markdown(""" Potential Risks
""")
# write out a JSON
data = {'time': str(datetime.datetime.now(datetime.timezone.utc)),
'intent': usecase,
'model': model_name_or_path,
'taxonomy': taxonomy,
'risks': [json.loads(r.json()) for r in risks]
}
file_path = Path("static/download.json")
with open(file_path, mode='w') as f:
f.write(json.dumps(data, indent=4))
downloadable = True
# return out_df
return out_sec, gr.State(risks), gr.Dataset(samples=[r.id for r in risks],
sample_labels=sample_labels,
samples_per_page=50, visible=True, label="Estimated by an LLM."), gr.DownloadButton("Download JSON", "static/download.json", visible=(downloadable and len(risks) > 0))
def get_controls_and_actions(riskid, taxonomy):
selected_risk = ran.get_risk(id=riskid)
related_risk_ids = [r.id for r in ran.get_related_risks(id=riskid)]
action_ids = []
control_ids =[]
intrinsic_ids=[]
if taxonomy == "ibm-risk-atlas":
# look for actions associated with related risks
if related_risk_ids:
for i in related_risk_ids:
rai = ran.get_related_actions(id=i)
if rai:
action_ids += rai
rac = ran.get_related_risk_controls(id=i)
if rac:
control_ids += rac
ran_intrinsics = ran.get_related_intrinsics(risk_id=i)
if ran_intrinsics:
intrinsic_ids += ran_intrinsics
else:
action_ids = []
control_ids = []
intrinsic_ids=[]
else:
# Use only actions related to primary risks
action_ids = ran.get_related_actions(id=riskid)
control_ids = ran.get_related_risk_controls(id=riskid)
intrinsic_ids = ran.get_related_intrinsics(risk_id=riskid)
return [ran.get_action_by_id(i).name for i in action_ids] + [ran.get_risk_control(i.id).name for i in control_ids] + [ran.get_intrinsic(i.id).name for i in intrinsic_ids]#type: ignore
@lru_cache
def mitigations(riskid: str, taxonomy: str) -> tuple[gr.Markdown, gr.Dataset, gr.DataFrame, gr.DataFrame, gr.Markdown, gr.Markdown]:
"""
For a specific risk (riskid), returns
(a) a risk description
(b) related risks - as a dataset
(c) mitigations
(d) related AI evaluations
(e) A subgraph of risk to mitigations
"""
try:
selected_risk = ran.get_risk(id=riskid)
risk_desc = selected_risk.description # type: ignore
risk_sec = f"Description:
{risk_desc}"
except AttributeError:
risk_sec = ""
related_risk_ids = [r.id for r in ran.get_related_risks(id=riskid)]
related_ai_eval_ids = [ai_eval.id for ai_eval in ran.get_related_evaluations(risk_id=riskid)]
action_ids = []
control_ids =[]
intrinsic_ids=[]
if taxonomy == "ibm-risk-atlas":
# look for actions associated with related risks
if related_risk_ids:
for i in related_risk_ids:
ran_actions = ran.get_related_actions(id=i)
if ran_actions:
action_ids += ran_actions
ran_controls = ran.get_related_risk_controls(id=i)
if ran_controls:
control_ids += ran_controls
ran_intrinsics = ran.get_related_intrinsics(risk_id=i)
if ran_intrinsics:
intrinsic_ids += ran_intrinsics
else:
action_ids = []
control_ids = []
intrinsic_ids=[]
else:
# Use only actions related to primary risks
action_ids = ran.get_related_actions(id=riskid)
control_ids = ran.get_related_risk_controls(id=riskid)
intrinsic_ids = ran.get_related_intrinsics(risk_id=riskid)
# Sanitize outputs
if not related_risk_ids:
label = "No related risks found."
samples = None
sample_labels = None
else:
label = f"Risks from other taxonomies related to {riskid}"
samples = related_risk_ids
sample_labels = [i.name for i in ran.get_related_risks(id=riskid)] #type: ignore
if not action_ids and not control_ids and not intrinsic_ids:
alabel = "No mitigations found."
asamples = None
asample_labels = None
mitdf = pd.DataFrame()
else:
alabel = f"Mitigation actions and controls related to risk {riskid}."
asamples = action_ids
asamples_ctl = control_ids
asamples_int = intrinsic_ids
asample_labels = [ran.get_action_by_id(i).description for i in asamples] + [ran.get_risk_control(i.id).name for i in asamples_ctl] + [ran.get_intrinsic(i.id).description for i in asamples_int]# type: ignore
asample_name = [ran.get_action_by_id(i).name for i in asamples] + [ran.get_risk_control(i.id).name for i in asamples_ctl] + [ran.get_intrinsic(i.id).name for i in asamples_int] #type: ignore
asample_types = ["Action" for i in asamples] + ["Control" for i in asamples_ctl] + ["Intrinsic" for i in asamples_int]
mitdf = pd.DataFrame({"Type": asample_types, "Mitigation": asample_name, "Description": asample_labels})
if not related_ai_eval_ids:
blabel = "No related AI evaluations found."
bsamples = None
bsample_labels = None
aievalsdf = pd.DataFrame()
else:
blabel = f"AI Evaluations related to {riskid}"
bsamples = related_ai_eval_ids
bsample_labels = [ran.get_evaluation(i).description for i in bsamples] # type: ignore
bsample_name = [ran.get_evaluation(i).name for i in bsamples] #type: ignore
aievalsdf = pd.DataFrame({"AI Evaluation": bsample_name, "Description": bsample_labels})
status = gr.Markdown(" ") if len(mitdf) > 0 else gr.Markdown("No mitigations found.")
fig = gr.Markdown(" ") if not selected_risk else generate_subgraph(selected_risk)
return (gr.Markdown(risk_sec),
gr.Dataset(samples=samples, label=label, sample_labels=sample_labels, visible=True),
gr.DataFrame(mitdf, wrap=True, show_copy_button=True, show_search="search", label=alabel, visible=True),
gr.DataFrame(aievalsdf, wrap=True, show_copy_button=True, show_search="search", label=blabel, visible=True),
status, fig)