ggunio's picture
Upload folder using huggingface_hub
ff85374 verified
"""
Intelligent Tokenizer v6.2.0 - Byte Tokenizer with 46+2 Configuration
Handles chunking, sliding windows, and boundary adjustments
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
def _trim_utf8_boundary(byte_seq: List[int], limit: int) -> int:
"""
Trim byte sequence to valid UTF-8 boundary (GPT suggestion)
"""
end = min(limit, len(byte_seq))
while end > 0:
try:
bytes(byte_seq[:end]).decode('utf-8')
return end
except UnicodeDecodeError:
end -= 1
return limit
class ByteTokenizerV62:
"""
Pure byte-level tokenizer
46 content bytes + 2 special tokens (BOS/EOS) = 48 total
"""
def __init__(self, config: Optional[Dict] = None):
# Configuration
self.content_size = 46 # Actual content bytes
self.max_seq_len = 48 # Total with BOS/EOS
self.chunk_overlap = 8 # Overlap for sliding window
# Special tokens
self.PAD = 256
self.BOS = 257
self.EOS = 258
self.MASK = 259
self.vocab_size = 260 # 256 bytes + 4 special
def encode(self,
text: str,
add_special_tokens: bool = True,
return_chunks: bool = False) -> Dict[str, torch.Tensor]:
"""
Encode text to byte sequences
Args:
text: Input text
add_special_tokens: Whether to add BOS/EOS
return_chunks: Return multiple chunks for long sequences
"""
# Convert to UTF-8 bytes
byte_sequence = list(text.encode('utf-8'))
if return_chunks and len(byte_sequence) > self.content_size:
# Handle long sequences with sliding window
return self._encode_with_chunks(byte_sequence, add_special_tokens)
# Single chunk processing with UTF-8 boundary (GPT suggestion)
if len(byte_sequence) > self.content_size:
cut_point = _trim_utf8_boundary(byte_sequence, self.content_size)
byte_sequence = byte_sequence[:cut_point]
# Add special tokens (GPT suggestion: cleaner padding order)
if add_special_tokens:
byte_sequence = [self.BOS] + byte_sequence + [self.EOS]
# Pad to max_seq_len (after special tokens for cleaner structure)
if len(byte_sequence) < self.max_seq_len:
padding_length = self.max_seq_len - len(byte_sequence)
byte_sequence = byte_sequence + [self.PAD] * padding_length
input_ids = torch.tensor(byte_sequence, dtype=torch.long)
attention_mask = (input_ids != self.PAD) # bool type (GPT suggestion)
return {
'input_ids': input_ids,
'attention_mask': attention_mask,
'length': len(byte_sequence),
'original_length': len(text.encode('utf-8'))
}
def _encode_with_chunks(self,
byte_sequence: List[int],
add_special_tokens: bool) -> Dict[str, torch.Tensor]:
"""
Encode long sequences with sliding window chunks
"""
chunks = []
positions = []
# Calculate stride (content_size - overlap)
stride = self.content_size - self.chunk_overlap
for i in range(0, len(byte_sequence), stride):
# Extract chunk
chunk = byte_sequence[i:i + self.content_size]
# Skip if chunk is too small (last chunk)
if len(chunk) < self.content_size // 2:
if chunks: # Merge with previous chunk if exists
last_chunk = chunks[-1]['input_ids'].tolist()
# Remove padding and special tokens from last chunk (GPT final check)
last_chunk = [b for b in last_chunk if b not in [self.PAD, self.BOS, self.EOS]]
# Add current chunk
merged = last_chunk + chunk + [self.EOS]
# Repad
if len(merged) < self.max_seq_len:
merged += [self.PAD] * (self.max_seq_len - len(merged))
merged_ids = torch.tensor(merged[:self.max_seq_len], dtype=torch.long)
merged_mask = (merged_ids != self.PAD) # Recalculate mask (GPT suggestion)
chunks[-1]['input_ids'] = merged_ids
chunks[-1]['attention_mask'] = merged_mask
break
# Pad chunk if necessary
if len(chunk) < self.content_size:
chunk += [self.PAD] * (self.content_size - len(chunk))
# Add special tokens
if add_special_tokens:
chunk_with_special = [self.BOS] + chunk + [self.EOS]
else:
chunk_with_special = chunk
# Create tensors
input_ids = torch.tensor(chunk_with_special, dtype=torch.long)
attention_mask = (input_ids != self.PAD) # bool type (GPT suggestion)
chunks.append({
'input_ids': input_ids,
'attention_mask': attention_mask,
'position': (i, min(i + self.content_size, len(byte_sequence)))
})
positions.append((i, min(i + self.content_size, len(byte_sequence))))
# Stack all chunks
all_input_ids = torch.stack([c['input_ids'] for c in chunks])
all_attention_masks = torch.stack([c['attention_mask'] for c in chunks])
return {
'input_ids': all_input_ids, # [num_chunks, seq_len]
'attention_mask': all_attention_masks,
'num_chunks': len(chunks),
'chunk_positions': positions,
'original_length': len(byte_sequence)
}
def reconstruct(self,
input_ids: torch.Tensor,
positions: List[Tuple[int, int]] = None,
skip_special_tokens: bool = True,
overlap: int = 8) -> str:
"""
Reconstruct text from multiple chunks (GPT suggestion)
Args:
input_ids: [num_chunks, seq_len] for multi-chunk
positions: List of (start, end) positions for each chunk
skip_special_tokens: Whether to skip special tokens
overlap: Overlap size between chunks
"""
if input_ids.dim() == 1:
# Single sequence, use regular decode
return self.decode(input_ids, skip_special_tokens)
# Multi-chunk reconstruction
pieces = []
for i, chunk_ids in enumerate(input_ids):
chunk_ids = chunk_ids.cpu().numpy().tolist()
# Remove special tokens and padding
if skip_special_tokens:
chunk_ids = [
b for b in chunk_ids
if b not in [self.PAD, self.BOS, self.EOS, self.MASK] and b < 256
]
pieces.append(chunk_ids)
# Merge chunks with overlap handling
output = []
for i, chunk in enumerate(pieces):
if i == 0:
output.extend(chunk)
else:
# Skip overlap bytes from current chunk
output.extend(chunk[overlap:] if len(chunk) > overlap else chunk)
# Convert to string
try:
text = bytes(output).decode('utf-8', errors='replace')
except:
text = ""
return text
def decode(self,
input_ids: torch.Tensor,
skip_special_tokens: bool = True) -> str:
"""
Decode byte sequences back to text
"""
if isinstance(input_ids, torch.Tensor):
input_ids = input_ids.cpu().numpy().tolist()
# Handle batch dimension
if isinstance(input_ids[0], list):
input_ids = input_ids[0]
# Remove special tokens and padding
if skip_special_tokens:
input_ids = [
b for b in input_ids
if b not in [self.PAD, self.BOS, self.EOS, self.MASK] and b < 256
]
# Convert bytes to string
try:
text = bytes(input_ids).decode('utf-8', errors='replace')
except:
text = ""
return text
def batch_encode(self,
texts: List[str],
add_special_tokens: bool = True) -> Dict[str, torch.Tensor]:
"""
Encode multiple texts as a batch
"""
encoded = [self.encode(text, add_special_tokens) for text in texts]
# Find max length
max_len = max(e['length'] for e in encoded)
max_len = min(max_len, self.max_seq_len)
# Create batch tensors
batch_size = len(texts)
input_ids = torch.full((batch_size, max_len), self.PAD, dtype=torch.long)
attention_mask = torch.zeros((batch_size, max_len), dtype=torch.bool) # bool type (GPT suggestion)
for i, enc in enumerate(encoded):
seq_len = min(enc['length'], max_len)
if enc['input_ids'].dim() == 0: # Handle scalar
enc['input_ids'] = enc['input_ids'].unsqueeze(0)
input_ids[i, :seq_len] = enc['input_ids'][:seq_len]
attention_mask[i, :seq_len] = True
return {
'input_ids': input_ids,
'attention_mask': attention_mask,
'lengths': [e['length'] for e in encoded]
}
class ChunkBoundaryAdjuster(nn.Module):
"""
Neural network for adjusting chunk boundaries
Learns optimal splitting points
"""
def __init__(self, hidden_dim: int = 256):
super().__init__()
# Boundary scoring network
self.boundary_scorer = nn.Sequential(
nn.Linear(256, hidden_dim), # Input: byte embeddings
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_dim, hidden_dim // 2),
nn.ReLU(),
nn.Linear(hidden_dim // 2, 1), # Output: boundary score
nn.Sigmoid()
)
# UTF-8 boundary detector
self.utf8_detector = nn.Sequential(
nn.Conv1d(1, 16, kernel_size=4, padding=2), # Detect multi-byte patterns
nn.ReLU(),
nn.Conv1d(16, 1, kernel_size=1),
nn.Sigmoid()
)
def forward(self, byte_sequence: torch.Tensor) -> torch.Tensor:
"""
Find optimal chunk boundaries
Args:
byte_sequence: [batch, seq_len, embedding_dim]
Returns:
boundary_scores: [batch, seq_len] - probability of boundary at each position
"""
batch_size, seq_len = byte_sequence.shape[:2]
# Score each position as potential boundary
boundary_scores = self.boundary_scorer(byte_sequence).squeeze(-1)
# Detect UTF-8 boundaries (avoid splitting multi-byte characters)
byte_values = byte_sequence[..., 0].unsqueeze(1) # [batch, 1, seq_len]
utf8_scores = self.utf8_detector(byte_values).squeeze(1) # [batch, seq_len]
# Combine scores (prefer boundaries at valid UTF-8 positions)
combined_scores = boundary_scores * utf8_scores
# Apply constraints: boundaries should be ~46 bytes apart
for i in range(0, seq_len, 46):
if i < seq_len:
# Boost score at expected positions
combined_scores[:, i] = combined_scores[:, i] * 1.5
return combined_scores
class SlidingWindowProcessor(nn.Module):
"""
Process sequences with sliding windows at multiple scales
"""
def __init__(self, window_sizes: List[int] = [8, 16, 32, 46]):
super().__init__()
self.window_sizes = window_sizes
# Multi-scale convolutions for different window sizes
self.convs = nn.ModuleList([
nn.Conv1d(256, 128, kernel_size=ws, stride=ws//2, padding=ws//4)
for ws in window_sizes
])
# Fusion layer
self.fusion = nn.Sequential(
nn.Linear(128 * len(window_sizes), 256),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(256, 256)
)
def forward(self, byte_embeddings: torch.Tensor) -> torch.Tensor:
"""
Apply multi-scale sliding windows
Args:
byte_embeddings: [batch, seq_len, embedding_dim]
Returns:
processed: [batch, seq_len, embedding_dim]
"""
# Transpose for conv1d
x = byte_embeddings.transpose(1, 2) # [batch, embed, seq]
# Apply multi-scale convolutions
multi_scale_features = []
for conv in self.convs:
features = conv(x) # Different seq lengths
# Global average pooling to fixed size
pooled = F.adaptive_avg_pool1d(features, byte_embeddings.size(1))
multi_scale_features.append(pooled)
# Concatenate and transpose back
concat = torch.cat(multi_scale_features, dim=1) # [batch, 128*scales, seq]
concat = concat.transpose(1, 2) # [batch, seq, 128*scales]
# Fuse multi-scale features
fused = self.fusion(concat) # [batch, seq, 256]
# Residual connection
output = fused + byte_embeddings
return output
class AdaptiveChunker:
"""
Adaptive chunking based on content complexity
Simple heuristic-based chunker for inference
"""
def __init__(self):
self.min_chunk = 32
self.max_chunk = 46
self.target_chunk = 46
def determine_chunk_size(self, text: str) -> int:
"""
Determine optimal chunk size based on text characteristics
"""
byte_seq = text.encode('utf-8')
# Check character types
has_cjk = any(b >= 0x80 for b in byte_seq[:100]) # Non-ASCII
has_arabic = any(0x0600 <= ord(c) <= 0x06FF for c in text[:100])
# Adjust chunk size based on content
if has_cjk:
# CJK characters need smaller chunks (multi-byte)
return self.min_chunk
elif has_arabic:
# Arabic also benefits from smaller chunks
return 40
else:
# ASCII/Latin can use larger chunks
return self.target_chunk
def chunk_text(self, text: str) -> List[str]:
"""
Split text into adaptive chunks
"""
chunk_size = self.determine_chunk_size(text)
byte_seq = text.encode('utf-8')
chunks = []
i = 0
while i < len(byte_seq):
# Find chunk boundary (don't split UTF-8 sequences)
end = min(i + chunk_size, len(byte_seq))
# Backtrack to valid UTF-8 boundary if needed
while end > i and end < len(byte_seq):
try:
_ = byte_seq[i:end].decode('utf-8')
break
except:
end -= 1
chunk_bytes = byte_seq[i:end]
chunks.append(chunk_bytes.decode('utf-8', errors='replace'))
i = end
return chunks
if __name__ == "__main__":
# Test the tokenizer
tokenizer = ByteTokenizerV62()
# Test texts
test_texts = [
"Hello, world!",
"안녕하세요, 세계!",
"今天天气很好。",
"مرحبا بالعالم",
"A" * 100 # Long text
]
for text in test_texts:
print(f"\nText: {text[:50]}...")
# Single chunk encoding
encoded = tokenizer.encode(text)
print(f" Encoded shape: {encoded['input_ids'].shape}")
print(f" Original length: {encoded['original_length']} bytes")
# Decode back
decoded = tokenizer.decode(encoded['input_ids'])
print(f" Decoded: {decoded[:50]}...")
# Check multi-chunk for long text
if encoded['original_length'] > 46:
multi_encoded = tokenizer.encode(text, return_chunks=True)
print(f" Chunks: {multi_encoded['num_chunks']}")
# Test batch encoding
batch = tokenizer.batch_encode(test_texts[:3])
print(f"\nBatch shape: {batch['input_ids'].shape}")
# Test adaptive chunker
chunker = AdaptiveChunker()
for text in test_texts[:3]:
chunk_size = chunker.determine_chunk_size(text)
print(f"\n{text[:30]}... → Chunk size: {chunk_size}")