|
|
|
|
|
import torch |
|
|
import torch.nn.functional as F |
|
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification, GPT2TokenizerFast, GPT2LMHeadModel |
|
|
import math |
|
|
import gradio as gr |
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
from googlesearch import search |
|
|
from ddgs import DDGS |
|
|
from bs4 import BeautifulSoup |
|
|
import httpx |
|
|
import re, os |
|
|
import numpy as np |
|
|
import asyncio |
|
|
import logging |
|
|
import nltk |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logging.getLogger("transformers").setLevel(logging.ERROR) |
|
|
logging.getLogger("sentence_transformers").setLevel(logging.ERROR) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
nltk.data.find('tokenizers/punkt') |
|
|
except LookupError: |
|
|
nltk.download('punkt') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
gpt2_model = GPT2LMHeadModel.from_pretrained("gpt2").to(device) |
|
|
gpt2_tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") |
|
|
gpt2_model.eval() |
|
|
|
|
|
|
|
|
def ai_score_perplexity(text): |
|
|
encodings = gpt2_tokenizer(text, return_tensors="pt").to(device) |
|
|
with torch.no_grad(): |
|
|
outputs = gpt2_model(**encodings, labels=encodings["input_ids"]) |
|
|
loss = outputs.loss |
|
|
ppl = math.exp(loss.item()) |
|
|
|
|
|
return 1.0 / (1.0 + ppl) |
|
|
|
|
|
def robust_ai_score(text): |
|
|
|
|
|
base_score = ai_score_perplexity(text) |
|
|
|
|
|
|
|
|
scaled_score = base_score * 1000 |
|
|
|
|
|
|
|
|
if scaled_score < 35: |
|
|
label = "Human" |
|
|
elif 35 <= scaled_score < 40: |
|
|
label = "Mixed Content (mostly human)" |
|
|
elif 40 <= scaled_score < 45: |
|
|
label = "Mixed Content (mostly AI)" |
|
|
else: |
|
|
|
|
|
scaled_score = min(scaled_score + 20, 90) |
|
|
label = "AI" |
|
|
|
|
|
return { |
|
|
"final_score": round(scaled_score, 2), |
|
|
"label": label, |
|
|
"components": { |
|
|
"perplexity_score": base_score |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
""" |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
detectors = { |
|
|
"roberta-large": AutoModelForSequenceClassification.from_pretrained("roberta-large-openai-detector"), |
|
|
"roberta-base": AutoModelForSequenceClassification.from_pretrained("roberta-base-openai-detector") |
|
|
} |
|
|
tokenizers = { |
|
|
"roberta-large": AutoTokenizer.from_pretrained("roberta-large-openai-detector"), |
|
|
"roberta-base": AutoTokenizer.from_pretrained("roberta-base-openai-detector") |
|
|
} |
|
|
for model in detectors.values(): |
|
|
model.eval() |
|
|
|
|
|
gpt2_model = GPT2LMHeadModel.from_pretrained("gpt2") |
|
|
gpt2_tokenizer = GPT2TokenizerFast.from_pretrained("gpt2") |
|
|
gpt2_model.eval() |
|
|
|
|
|
# Scoring functions |
|
|
def ai_score_roberta(text, model_name): |
|
|
tokenizer = tokenizers[model_name] |
|
|
model = detectors[model_name] |
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True) |
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
probs = F.softmax(outputs.logits, dim=1) |
|
|
return probs[0][1].item() |
|
|
|
|
|
def ai_score_perplexity(text): |
|
|
encodings = gpt2_tokenizer(text, return_tensors="pt") |
|
|
with torch.no_grad(): |
|
|
outputs = gpt2_model(**encodings, labels=encodings["input_ids"]) |
|
|
loss = outputs.loss |
|
|
ppl = math.exp(loss.item()) |
|
|
return 1.0 / (1.0 + ppl) |
|
|
|
|
|
def robust_ai_score(text, weights={"large":0.4, "base":0.4, "ppl":0.2}, threshold_adjust=0.95): |
|
|
score_large = ai_score_roberta(text, "roberta-large") |
|
|
score_base = ai_score_roberta(text, "roberta-base") |
|
|
score_ppl = ai_score_perplexity(text) |
|
|
|
|
|
final_score = (weights["large"]*score_large + |
|
|
weights["base"]*score_base + |
|
|
weights["ppl"]*score_ppl) |
|
|
|
|
|
label = "Likely AI" if final_score > threshold_adjust else "Possibly Human" |
|
|
return { |
|
|
"final_score": round(final_score, 4), |
|
|
"label": label, |
|
|
"components": { |
|
|
"roberta-large": score_large, |
|
|
"roberta-base": score_base, |
|
|
"perplexity": score_ppl |
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
def calculate_perplexity(text): |
|
|
|
|
|
encodings = gpt2_tokenizer(text, return_tensors="pt", truncation=True, max_length=512).to(device) |
|
|
seq_len = encodings.input_ids.size(1) |
|
|
nlls = [] |
|
|
stride = 512 // 2 |
|
|
|
|
|
for begin_loc in range(0, seq_len, stride): |
|
|
end_loc = min(begin_loc + stride, seq_len) |
|
|
trg_len = end_loc - begin_loc |
|
|
input_ids = encodings.input_ids[:, begin_loc:end_loc] |
|
|
target_ids = input_ids.clone() |
|
|
with torch.no_grad(): |
|
|
outputs = gpt2_model(input_ids, labels=target_ids) |
|
|
neg_log_likelihood = outputs.loss * trg_len |
|
|
nlls.append(neg_log_likelihood) |
|
|
|
|
|
ppl = torch.exp(torch.stack(nlls).sum() / seq_len).item() |
|
|
# Normalize: Perplexity < 25 = AI-like (1), > 100 = human-like (0) |
|
|
return max(0, min(1, 1 - (ppl - 25) / 75)) |
|
|
|
|
|
def calculate_burstiness(text): |
|
|
= |
|
|
sentences = nltk.sent_tokenize(text) |
|
|
if len(sentences) < 2: |
|
|
return 0.0 |
|
|
lengths = [len(sent.split()) for sent in sentences] |
|
|
mean_len = statistics.mean(lengths) |
|
|
return statistics.stdev(lengths) / mean_len if mean_len > 0 else 0.0 |
|
|
|
|
|
def ai_score_roberta(text, model_name, max_length=512): |
|
|
|
|
|
tokenizer = tokenizers[model_name] |
|
|
model = detectors[model_name] |
|
|
chunks = [text[i:i+max_length] for i in range(0, len(text), max_length)] |
|
|
scores = [] |
|
|
|
|
|
for chunk in chunks: |
|
|
inputs = tokenizer(chunk, return_tensors="pt", truncation=True, max_length=max_length, padding=True).to(device) |
|
|
with torch.no_grad(): |
|
|
outputs = model(**inputs) |
|
|
probs = F.softmax(outputs.logits, dim=1) |
|
|
scores.append(probs[0][1].item()) # AI probability |
|
|
return sum(scores) / len(scores) if scores else 0.0 |
|
|
|
|
|
def robust_ai_score(text, weights={"large": 0.4, "base": 0.3, "ppl": 0.2, "burst": 0.1}, threshold=0.7): |
|
|
|
|
|
if len(text.strip()) < 50: |
|
|
return { |
|
|
"final_score": 0.0, |
|
|
"label": "Possibly Human", |
|
|
"explanation": "Text too short (<50 chars) for reliable detection." |
|
|
} |
|
|
|
|
|
# Calculate component scores |
|
|
score_large = ai_score_roberta(text, "roberta-large") |
|
|
score_base = ai_score_roberta(text, "roberta-base") |
|
|
score_ppl = calculate_perplexity(text) |
|
|
score_burst = 1 - min(1, calculate_burstiness(text) / 0.5) # Low burstiness = AI-like |
|
|
|
|
|
# Ensemble score |
|
|
final_score = ( |
|
|
weights["large"] * score_large + |
|
|
weights["base"] * score_base + |
|
|
weights["ppl"] * score_ppl + |
|
|
weights["burst"] * score_burst |
|
|
) |
|
|
|
|
|
# Dynamic threshold: Adjust based on text length |
|
|
text_len = len(text.split()) |
|
|
adjusted_threshold = threshold * (1 - 0.1 * (text_len < 200)) # Lower for short texts |
|
|
|
|
|
label = "Likely AI" if final_score >= adjusted_threshold else "Possibly Human" |
|
|
explanation = ( |
|
|
f"RoBERTa-Large: {score_large:.3f}, RoBERTa-Base: {score_base:.3f}, " |
|
|
f"Perplexity Score: {score_ppl:.3f} (lower=AI), Burstiness Score: {score_burst:.3f} (lower=AI). " |
|
|
f"Text length: {text_len} words, Threshold: {adjusted_threshold:.3f}" |
|
|
) |
|
|
|
|
|
return { |
|
|
"final_score": round(final_score, 4), |
|
|
"label": label, |
|
|
"explanation": explanation |
|
|
} |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
plag_model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
|
|
|
def clean_text(text): |
|
|
return re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
async def fetch_web_paragraphs(url): |
|
|
try: |
|
|
headers = {'User-Agent': 'Mozilla/5.0'} |
|
|
async with httpx.AsyncClient() as client: |
|
|
r = await client.get(url, headers=headers, timeout=10) |
|
|
if r.status_code != 200: |
|
|
logging.warning(f"Failed to fetch URL: {url}, status: {r.status_code}") |
|
|
return [] |
|
|
soup = BeautifulSoup(r.text, 'html.parser') |
|
|
return [clean_text(p.get_text()) for p in soup.find_all('p') if p.get_text().strip()] |
|
|
except Exception as e: |
|
|
logging.error(f"Error fetching {url}: {str(e)}") |
|
|
return [] |
|
|
|
|
|
async def get_search_urls(text, num_results=10): |
|
|
urls = [] |
|
|
try: |
|
|
urls = list(search(text, num_results=num_results, stop=num_results)) |
|
|
except Exception as e: |
|
|
logging.warning(f"Google search failed: {str(e)}") |
|
|
|
|
|
if len(urls) < num_results: |
|
|
try: |
|
|
with DDGS() as ddgs: |
|
|
results = ddgs.text(text, max_results=num_results - len(urls)) |
|
|
urls += [r['href'] for r in results] |
|
|
except Exception as e: |
|
|
logging.warning(f"DuckDuckGo search failed: {str(e)}") |
|
|
return urls |
|
|
|
|
|
|
|
|
|
|
|
def hybrid_similarity(text1, text2): |
|
|
emb1 = plag_model.encode(text1, convert_to_tensor=True) |
|
|
emb2 = plag_model.encode(text2, convert_to_tensor=True) |
|
|
return util.pytorch_cos_sim(emb1, emb2).item() |
|
|
|
|
|
async def internet_plagiarism_score(input_text, num_results=10): |
|
|
urls = await get_search_urls(input_text, num_results=num_results) |
|
|
all_matches = [] |
|
|
for url in urls: |
|
|
paragraphs = await fetch_web_paragraphs(url) |
|
|
if not paragraphs: |
|
|
continue |
|
|
max_sim = max([hybrid_similarity(input_text, p) for p in paragraphs]) |
|
|
all_matches.append((url, max_sim)) |
|
|
await asyncio.sleep(0.5) |
|
|
if not all_matches: |
|
|
return {"score": 0, "matches": []} |
|
|
top_matches = sorted(all_matches, key=lambda x: x[1], reverse=True)[:5] |
|
|
avg_score = np.mean([sim for _, sim in top_matches]) |
|
|
return { |
|
|
"score": round(avg_score * 100, 2), |
|
|
"urls": [u for u, _ in top_matches] |
|
|
} |
|
|
|
|
|
def check_plagiarism_sync(text): |
|
|
return asyncio.run(internet_plagiarism_score(text)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
with gr.Tab("AI Detection"): |
|
|
ai_input = gr.Textbox(lines=5, label="Enter text to analyze") |
|
|
ai_output = gr.JSON(label="AI Detection Result") |
|
|
ai_button = gr.Button("Analyze") |
|
|
ai_button.click(fn=robust_ai_score, inputs=ai_input, outputs=ai_output) |
|
|
|
|
|
with gr.Tab("Plagiarism Checker"): |
|
|
plg_input = gr.Textbox(lines=5, label="Enter text to check plagiarism") |
|
|
plg_output = gr.JSON(label="Plagiarism Result") |
|
|
plg_button = gr.Button("Check Plagiarism") |
|
|
plg_button.click(fn=check_plagiarism_sync, inputs=plg_input, outputs=plg_output) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|
|