|
|
|
|
|
|
|
|
"""
|
|
|
Intelligent Tokenizer v6.0 - Working Demo for Hugging Face Spaces
|
|
|
์ค์ ์๋ํ๋ ๋ฐ๋ชจ - ์๋ฎฌ๋ ์ด์
์์
|
|
|
"""
|
|
|
|
|
|
import gradio as gr
|
|
|
import torch
|
|
|
import sys
|
|
|
import io
|
|
|
from pathlib import Path
|
|
|
import json
|
|
|
import time
|
|
|
|
|
|
|
|
|
if sys.stdout.encoding != 'utf-8':
|
|
|
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
|
|
|
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
|
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent))
|
|
|
|
|
|
|
|
|
from core.boundary_aware_model import BoundaryAwareTokenizerModel
|
|
|
from src.core.byte_tokenizer_v6 import ByteTokenizerV6
|
|
|
|
|
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
|
|
|
|
class IntelligentTokenizerDemo:
|
|
|
def __init__(self):
|
|
|
"""Initialize the actual model"""
|
|
|
self.device = device
|
|
|
self.tokenizer = ByteTokenizerV6()
|
|
|
self.model = None
|
|
|
self.load_model()
|
|
|
|
|
|
def load_model(self):
|
|
|
"""Load the actual trained model"""
|
|
|
try:
|
|
|
|
|
|
model_path = Path("pytorch_model.bin")
|
|
|
if not model_path.exists():
|
|
|
|
|
|
model_path = Path("checkpoints/latest_checkpoint.pt")
|
|
|
|
|
|
if model_path.exists():
|
|
|
print(f"Loading model from {model_path}...")
|
|
|
checkpoint = torch.load(model_path, map_location=self.device, weights_only=False)
|
|
|
|
|
|
|
|
|
if 'model_config' in checkpoint:
|
|
|
model_config = checkpoint['model_config']
|
|
|
else:
|
|
|
|
|
|
with open("config.json", "r") as f:
|
|
|
config = json.load(f)
|
|
|
model_config = {
|
|
|
'vocab_size': config['vocab_size'],
|
|
|
'hidden_dim': config.get('decoder_hidden', 768),
|
|
|
'num_heads': config['num_heads'],
|
|
|
'num_encoder_layers': 5,
|
|
|
'num_decoder_layers': config['num_decoder_layers'],
|
|
|
'dropout': config['dropout']
|
|
|
}
|
|
|
|
|
|
|
|
|
self.model = BoundaryAwareTokenizerModel(**model_config)
|
|
|
|
|
|
|
|
|
if 'model_state_dict' in checkpoint:
|
|
|
self.model.load_state_dict(checkpoint['model_state_dict'])
|
|
|
else:
|
|
|
self.model.load_state_dict(checkpoint)
|
|
|
|
|
|
self.model = self.model.to(self.device)
|
|
|
self.model.eval()
|
|
|
print("Model loaded successfully!")
|
|
|
|
|
|
else:
|
|
|
print("Warning: No model checkpoint found, using untrained model")
|
|
|
|
|
|
model_config = {
|
|
|
'vocab_size': 260,
|
|
|
'hidden_dim': 768,
|
|
|
'num_heads': 8,
|
|
|
'num_encoder_layers': 5,
|
|
|
'num_decoder_layers': 6,
|
|
|
'dropout': 0.1
|
|
|
}
|
|
|
self.model = BoundaryAwareTokenizerModel(**model_config)
|
|
|
self.model = self.model.to(self.device)
|
|
|
self.model.eval()
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error loading model: {e}")
|
|
|
raise
|
|
|
|
|
|
def embed_text(self, text):
|
|
|
"""์ค์ ์๋ฒ ๋ฉ ์์ฑ"""
|
|
|
if not text:
|
|
|
return None, "Please enter text"
|
|
|
|
|
|
try:
|
|
|
|
|
|
encoded = self.tokenizer.encode(text)
|
|
|
byte_ids = encoded['input_ids']
|
|
|
|
|
|
|
|
|
if len(byte_ids) > 256:
|
|
|
byte_ids = byte_ids[:256]
|
|
|
byte_ids[-1] = self.tokenizer.EOS
|
|
|
|
|
|
|
|
|
input_ids = torch.tensor([byte_ids], device=self.device)
|
|
|
attention_mask = torch.tensor([encoded['attention_mask'][:len(byte_ids)]], device=self.device)
|
|
|
|
|
|
|
|
|
with torch.no_grad():
|
|
|
encoder_outputs = self.model.encoder(input_ids, attention_mask)
|
|
|
embeddings = encoder_outputs['last_hidden_state']
|
|
|
|
|
|
|
|
|
original_bytes = len(text.encode('utf-8'))
|
|
|
compressed_tokens = embeddings.shape[1]
|
|
|
compression_ratio = original_bytes / compressed_tokens if compressed_tokens > 0 else 0
|
|
|
|
|
|
result = f"""โ
**Embedding Generated Successfully**
|
|
|
|
|
|
**Input Text:** {text[:100]}{'...' if len(text) > 100 else ''}
|
|
|
**Original Size:** {original_bytes} bytes
|
|
|
**Compressed Size:** {compressed_tokens} tokens
|
|
|
**Compression Ratio:** {compression_ratio:.2f}x
|
|
|
**Embedding Shape:** {list(embeddings.shape)}
|
|
|
**Device:** {self.device}
|
|
|
|
|
|
**First 10 values:** {embeddings[0, 0, :10].cpu().numpy().tolist()}
|
|
|
"""
|
|
|
return embeddings, result
|
|
|
|
|
|
except Exception as e:
|
|
|
return None, f"Error: {str(e)}"
|
|
|
|
|
|
def restore_text(self, text):
|
|
|
"""์ค์ ๋ณต์ ํ
์คํธ"""
|
|
|
if not text:
|
|
|
return "Please enter text"
|
|
|
|
|
|
try:
|
|
|
|
|
|
encoded = self.tokenizer.encode(text)
|
|
|
byte_ids = encoded['input_ids']
|
|
|
|
|
|
|
|
|
if len(byte_ids) > 256:
|
|
|
byte_ids = byte_ids[:256]
|
|
|
byte_ids[-1] = self.tokenizer.EOS
|
|
|
truncated = True
|
|
|
else:
|
|
|
truncated = False
|
|
|
|
|
|
if len(byte_ids) <= 1:
|
|
|
return "Text too short for restoration test"
|
|
|
|
|
|
|
|
|
input_ids = torch.tensor([byte_ids], device=self.device)
|
|
|
attention_mask = torch.tensor([encoded['attention_mask'][:len(byte_ids)]], device=self.device)
|
|
|
|
|
|
|
|
|
with torch.no_grad():
|
|
|
decoder_input = input_ids[:, :-1]
|
|
|
labels = input_ids[:, 1:]
|
|
|
|
|
|
outputs = self.model(
|
|
|
input_ids=input_ids,
|
|
|
attention_mask=attention_mask,
|
|
|
decoder_input_ids=decoder_input,
|
|
|
labels=labels,
|
|
|
use_cross_attention=True
|
|
|
)
|
|
|
|
|
|
|
|
|
predictions = torch.argmax(outputs['logits'], dim=-1)
|
|
|
accuracy = (predictions == labels).float().mean().item()
|
|
|
|
|
|
|
|
|
pred_list = predictions[0].cpu().tolist()
|
|
|
full_sequence = [self.tokenizer.BOS] + pred_list
|
|
|
|
|
|
|
|
|
filtered = [b for b in full_sequence if 0 <= b < 256]
|
|
|
if filtered:
|
|
|
restored_bytes = bytes(filtered)
|
|
|
restored_text = restored_bytes.decode('utf-8', errors='ignore')
|
|
|
else:
|
|
|
restored_text = "[Unable to restore]"
|
|
|
|
|
|
result = f"""โ
**Restoration Test Complete**
|
|
|
|
|
|
**Original Text:** {text[:100]}{'...' if len(text) > 100 else ''}
|
|
|
**Restored Text:** {restored_text[:100]}{'...' if len(restored_text) > 100 else ''}
|
|
|
**Accuracy:** {accuracy:.1%}
|
|
|
**Bytes Processed:** {len(byte_ids)}
|
|
|
{'**Note:** Text was truncated to 256 bytes' if truncated else ''}
|
|
|
|
|
|
**Status:** {'Perfect Match! โจ' if accuracy > 0.95 else 'Good Match' if accuracy > 0.8 else 'Partial Match'}
|
|
|
"""
|
|
|
return result
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error: {str(e)}"
|
|
|
|
|
|
def compress_stats(self, text):
|
|
|
"""์์ถ ํต๊ณ ๋ถ์"""
|
|
|
if not text:
|
|
|
return "Please enter text"
|
|
|
|
|
|
try:
|
|
|
lines = text.strip().split('\n')
|
|
|
results = []
|
|
|
|
|
|
for line in lines[:10]:
|
|
|
if not line.strip():
|
|
|
continue
|
|
|
|
|
|
|
|
|
encoded = self.tokenizer.encode(line)
|
|
|
byte_ids = encoded['input_ids']
|
|
|
|
|
|
if len(byte_ids) > 256:
|
|
|
byte_ids = byte_ids[:256]
|
|
|
|
|
|
input_ids = torch.tensor([byte_ids], device=self.device)
|
|
|
attention_mask = torch.tensor([encoded['attention_mask'][:len(byte_ids)]], device=self.device)
|
|
|
|
|
|
with torch.no_grad():
|
|
|
encoder_outputs = self.model.encoder(input_ids, attention_mask)
|
|
|
compressed_size = encoder_outputs['last_hidden_state'].shape[1]
|
|
|
|
|
|
original_size = len(line.encode('utf-8'))
|
|
|
ratio = original_size / compressed_size if compressed_size > 0 else 0
|
|
|
|
|
|
results.append({
|
|
|
'text': line[:50] + '...' if len(line) > 50 else line,
|
|
|
'original': original_size,
|
|
|
'compressed': compressed_size,
|
|
|
'ratio': ratio
|
|
|
})
|
|
|
|
|
|
|
|
|
output = "**Compression Analysis Results**\n\n"
|
|
|
output += "| Text | Original | Compressed | Ratio |\n"
|
|
|
output += "|------|----------|------------|-------|\n"
|
|
|
|
|
|
for r in results:
|
|
|
output += f"| {r['text']} | {r['original']} bytes | {r['compressed']} tokens | {r['ratio']:.2f}x |\n"
|
|
|
|
|
|
|
|
|
if results:
|
|
|
avg_ratio = sum(r['ratio'] for r in results) / len(results)
|
|
|
total_original = sum(r['original'] for r in results)
|
|
|
total_compressed = sum(r['compressed'] for r in results)
|
|
|
|
|
|
output += f"\n**Summary:**\n"
|
|
|
output += f"- Average Compression: {avg_ratio:.2f}x\n"
|
|
|
output += f"- Total Original: {total_original} bytes\n"
|
|
|
output += f"- Total Compressed: {total_compressed} tokens\n"
|
|
|
output += f"- Overall Ratio: {total_original/total_compressed if total_compressed > 0 else 0:.2f}x\n"
|
|
|
|
|
|
return output
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error: {str(e)}"
|
|
|
|
|
|
|
|
|
print("Initializing Intelligent Tokenizer Demo...")
|
|
|
demo = IntelligentTokenizerDemo()
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Intelligent Tokenizer v6.0", theme=gr.themes.Base()) as app:
|
|
|
gr.Markdown("""
|
|
|
# ๐ Intelligent Tokenizer v6.0 - Live Demo
|
|
|
|
|
|
**World's First Pure Learning-Based Byte-Level Tokenizer**
|
|
|
- No vocabulary files, no language rules - just intelligence!
|
|
|
- 260 fixed vocab (256 bytes + 4 special tokens)
|
|
|
- Works with ANY language/script/emoji
|
|
|
""")
|
|
|
|
|
|
with gr.Tab("๐ค Embedding"):
|
|
|
with gr.Row():
|
|
|
with gr.Column():
|
|
|
embed_input = gr.Textbox(
|
|
|
label="Input Text",
|
|
|
placeholder="Enter any text in any language...",
|
|
|
lines=3
|
|
|
)
|
|
|
embed_btn = gr.Button("Generate Embedding", variant="primary")
|
|
|
|
|
|
with gr.Column():
|
|
|
embed_output = gr.Markdown(label="Result")
|
|
|
|
|
|
embed_btn.click(
|
|
|
lambda x: demo.embed_text(x)[1],
|
|
|
inputs=embed_input,
|
|
|
outputs=embed_output
|
|
|
)
|
|
|
|
|
|
with gr.Tab("๐ Restoration"):
|
|
|
with gr.Row():
|
|
|
with gr.Column():
|
|
|
restore_input = gr.Textbox(
|
|
|
label="Input Text",
|
|
|
placeholder="Enter text to test restoration...",
|
|
|
lines=3
|
|
|
)
|
|
|
restore_btn = gr.Button("Test Restoration", variant="primary")
|
|
|
|
|
|
with gr.Column():
|
|
|
restore_output = gr.Markdown(label="Result")
|
|
|
|
|
|
restore_btn.click(
|
|
|
demo.restore_text,
|
|
|
inputs=restore_input,
|
|
|
outputs=restore_output
|
|
|
)
|
|
|
|
|
|
with gr.Tab("๐ Compression Analysis"):
|
|
|
with gr.Row():
|
|
|
with gr.Column():
|
|
|
compress_input = gr.Textbox(
|
|
|
label="Input Text (one item per line)",
|
|
|
placeholder="Enter multiple texts, one per line...",
|
|
|
lines=5
|
|
|
)
|
|
|
compress_btn = gr.Button("Analyze Compression", variant="primary")
|
|
|
|
|
|
with gr.Column():
|
|
|
compress_output = gr.Markdown(label="Analysis")
|
|
|
|
|
|
compress_btn.click(
|
|
|
demo.compress_stats,
|
|
|
inputs=compress_input,
|
|
|
outputs=compress_output
|
|
|
)
|
|
|
|
|
|
with gr.Tab("โน๏ธ About"):
|
|
|
gr.Markdown("""
|
|
|
## About Intelligent Tokenizer v6.0
|
|
|
|
|
|
### Key Features:
|
|
|
- **Pure Learning-Based**: No predefined rules or vocabularies
|
|
|
- **Universal Coverage**: Works with all 204+ languages equally
|
|
|
- **Compression**: 2-3x currently, targeting 5-10x
|
|
|
- **Real Model**: This demo uses the actual trained model (1.2GB)
|
|
|
|
|
|
### Architecture:
|
|
|
- Encoder: 5-layer transformer (512โ768 dims)
|
|
|
- Decoder: 6-layer transformer (768 hidden)
|
|
|
- Total: ~274M parameters
|
|
|
- Training: 23 epochs on multilingual data
|
|
|
|
|
|
### Development:
|
|
|
- Solo developer, 4 months development
|
|
|
- Trained on personal RTX 3060
|
|
|
- No prior AI experience
|
|
|
|
|
|
### Links:
|
|
|
- [GitHub Repository](https://github.com/ggunio/intelligent-tokenizer)
|
|
|
- [Hugging Face Model](https://huggingface.co/ggunio/intelligent-tokenizer-v6)
|
|
|
""")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
print(f"Running on device: {device}")
|
|
|
print("Launching Gradio app...")
|
|
|
app.launch() |