Imports¶
In [1]:
%pip install pysr kagglehub[pandas-datasets]
In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from datasets import load_dataset
from sentence_transformers import SentenceTransformer
import numpy as np
from pysr import PySRRegressor
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import kagglehub
from kagglehub import KaggleDatasetAdapter
from sklearn.preprocessing import StandardScaler
Autoencoder¶
In [3]:
class Autoencoder(nn.Module):
def __init__(self, input_dim, latent_dim):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, int(input_dim * 0.65)),
nn.SiLU(),
nn.Linear(int(input_dim * 0.65), int(input_dim * 0.4)),
nn.SiLU(),
nn.Linear(int(input_dim * 0.4), latent_dim)
)
self.decoder = nn.Sequential(
nn.Linear(latent_dim, int(input_dim * 0.4)),
nn.SiLU(),
nn.Linear(int(input_dim * 0.4), int(input_dim * 0.65)),
nn.SiLU(),
nn.Linear(int(input_dim * 0.65), input_dim)
)
def forward(self, x):
z = self.encoder(x)
x_recon = self.decoder(z)
return x_recon, z
def train_autoencoder(model, train_loader, epochs=64):
optimizer = optim.AdamW(model.parameters(), lr=0.002, weight_decay=6e-3)
criterion = nn.MSELoss()
model.train()
for epoch in range(epochs):
total_loss = 0
for batch, in train_loader:
optimizer.zero_grad()
recon, _ = model(batch)
loss = criterion(recon, batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.6f}")
return model
Tabular¶
In [4]:
torch.manual_seed(24)
np.random.seed(24)
df = kagglehub.dataset_load(KaggleDatasetAdapter.PANDAS, "uciml/forest-cover-type-dataset", "covtype.csv")
print(f"Dataset shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
X_tabular = df.drop('Cover_Type', axis=1).values
y_tabular = df['Cover_Type'].values
feature_names = df.drop('Cover_Type', axis=1).columns.tolist()
sample_size = 4096
sample_idx = np.random.choice(len(X_tabular), sample_size, replace=False)
X_tabular = X_tabular[sample_idx]
y_tabular = y_tabular[sample_idx]
print(f"Sampled data shape: {X_tabular.shape}")
In [5]:
scaler = StandardScaler()
X_tabular_normalized = scaler.fit_transform(X_tabular)
X_tabular_tensor = torch.FloatTensor(X_tabular_normalized)
train_size_tab = int(0.8 * len(X_tabular_tensor))
train_data_tab = X_tabular_tensor[:train_size_tab]
test_data_tab = X_tabular_tensor[train_size_tab:]
train_loader_tab = DataLoader(TensorDataset(train_data_tab), batch_size=32, shuffle=True)
test_loader_tab = DataLoader(TensorDataset(test_data_tab), batch_size=32)
In [6]:
input_dim_tab = X_tabular_tensor.shape[1]
latent_dim_tab = 32
print(f"\nTraining autoencoder ({input_dim_tab} -> {latent_dim_tab} -> {input_dim_tab})...")
autoencoder_tab = Autoencoder(input_dim_tab, latent_dim_tab)
autoencoder_tab = train_autoencoder(autoencoder_tab, train_loader_tab, epochs=45)
print("Extracting latent representations...")
autoencoder_tab.eval()
with torch.no_grad():
all_inputs_tab = train_data_tab
all_recons_tab, all_latents_tab = autoencoder_tab(all_inputs_tab)
X_input_tab = all_inputs_tab.numpy()
Z_latent_tab = all_latents_tab.numpy()
print(f"Input shape: {X_input_tab.shape}, Latent shape: {Z_latent_tab.shape}")
In [7]:
latent_formulas_tab = {}
input_importance_tab = np.zeros(input_dim_tab)
print("Finding symbolic formulas for latent dimensions (tabular)...")
for i in tqdm(range(min(8, latent_dim_tab))):
model_sr = PySRRegressor(
niterations=80,
populations=45,
binary_operators=["+", "-", "*", "/", "^", "max", "min", "mod"],
unary_operators=["neg", "square", "cube", "cbrt", "sqrt", "abs", "sign", "inv", "exp", "log", "log10", "log2", "log1p", "sin", "cos", "tan", "asin", "acos", "atan", "sinh", "cosh", "tanh", "asinh", "acosh", "atanh", "relu", "sinc", "floor", "ceil"],
parsimony=0.1,
verbosity=0,
model_selection="best",
)
sample_idx = np.random.choice(len(X_input_tab), min(1024, len(X_input_tab)), replace=False)
X_sample = X_input_tab[sample_idx]
z_sample = Z_latent_tab[sample_idx, i]
model_sr.fit(X_sample, z_sample)
best_formula = str(model_sr.sympy())
latent_formulas_tab[i] = best_formula
for j in range(input_dim_tab):
if f'x{j}' in best_formula:
input_importance_tab[j] += 1
In [8]:
print("--- Discovered Latent Formulas ---")
for i in range(min(5, len(latent_formulas_tab))):
formula = latent_formulas_tab[i]
if len(formula) > 100:
formula = formula[:97] + "..."
print(f"z_{i} = {formula}")
In [9]:
threshold = 0
discarded_dims_tab = np.sort(np.where(input_importance_tab == threshold)[0], kind="stable")
important_dims_tab = np.where(input_importance_tab > 2)[0][np.argsort(-input_importance_tab[input_importance_tab > 2], kind="stable")]
print("\n--- Input Feature Usage Analysis (Tabular) ---")
print(f"Highly important features (appear in >2 latents): {len(important_dims_tab)}")
print("Examples:")
for idx in important_dims_tab[:10]:
print(f" Feature {idx} ({feature_names[idx]}): used {int(input_importance_tab[idx])} times")
print(f"\nPotentially redundant features (never appear): {len(discarded_dims_tab)}")
print("Examples:")
for idx in discarded_dims_tab[:10]:
print(f" Feature {idx} ({feature_names[idx]})")
In [10]:
latent_dims_to_test_tab = [4, 8, 16, 24, 32]
reconstruction_errors_tab = {}
print("Training autoencoders with different latent dimensions (tabular)...")
for ld in latent_dims_to_test_tab:
print(f"\nLatent dim = {ld}")
ae = Autoencoder(input_dim_tab, ld)
ae = train_autoencoder(ae, train_loader_tab, epochs=60)
ae.eval()
with torch.no_grad():
recon, _ = ae(test_data_tab)
errors = ((recon - test_data_tab) ** 2).mean(dim=0).numpy()
reconstruction_errors_tab[ld] = errors
In [11]:
error_data_tab = []
for feature_j in range(input_dim_tab):
for ld in latent_dims_to_test_tab:
error_data_tab.append({
'feature_id': feature_j,
'feature_name': feature_names[feature_j],
'latent_dim': ld,
'reconstruction_error': reconstruction_errors_tab[ld][feature_j]
})
df_errors_tab = pd.DataFrame(error_data_tab)
print("--- Reconstruction Error Analysis (Tabular) ---")
print(f"Total data points: {len(df_errors_tab)}")
In [12]:
feature_properties_tab = pd.DataFrame({
'feature_id': range(input_dim_tab),
'feature_name': feature_names,
'mean': X_input_tab.mean(axis=0),
'std': X_input_tab.std(axis=0),
'importance': input_importance_tab
})
df_merged_tab = df_errors_tab.merge(feature_properties_tab, on='feature_id')
print("Finding symbolic law for reconstruction error (tabular)...")
X_recon_sr_tab = df_merged_tab[['latent_dim', 'mean', 'std', 'importance']].values
y_recon_sr_tab = df_merged_tab['reconstruction_error'].values
model_recon_sr_tab = PySRRegressor(
niterations=70,
populations=35,
binary_operators=["+", "-", "*", "/", "^", "max", "min", "mod"],
unary_operators=["neg", "square", "cube", "cbrt", "sqrt", "abs", "sign", "inv", "exp", "log", "log10", "log2", "log1p", "sin", "cos", "tan", "asin", "acos", "atan", "sinh", "cosh", "tanh", "asinh", "acosh", "atanh", "relu", "sinc", "floor", "ceil"],
parsimony=0.01,
verbosity=0,
model_selection="best",
)
sample_idx = np.random.choice(len(X_recon_sr_tab), min(2048, len(X_recon_sr_tab)), replace=False)
model_recon_sr_tab.fit(X_recon_sr_tab[sample_idx], y_recon_sr_tab[sample_idx])
Out[12]:
In [13]:
print("--- Discovered Reconstruction Law (Tabular) ---")
print(f"reconstruction_error = {model_recon_sr_tab.sympy()}")
print("Where:")
print(" x0 = latent_dim")
print(" x1 = feature_mean")
print(" x2 = feature_std")
print(" x3 = feature_importance")
best_reconstructed_tab = []
worst_reconstructed_tab = []
for feature_j in range(input_dim_tab):
avg_error = df_errors_tab[df_errors_tab['feature_id'] == feature_j]['reconstruction_error'].mean()
if avg_error < df_errors_tab['reconstruction_error'].quantile(0.1):
best_reconstructed_tab.append((feature_j, avg_error))
elif avg_error > df_errors_tab['reconstruction_error'].quantile(0.9):
worst_reconstructed_tab.append((feature_j, avg_error))
print("\n--- Best Reconstructed Features (Tabular, top 10) ---")
for feat, err in sorted(best_reconstructed_tab, key=lambda x: x[1])[:10]:
print(f"Feature {feat} ({feature_names[feat]}): error = {err:.6f}, importance = {input_importance_tab[feat]}")
print("\n--- Worst Reconstructed Features (Tabular, top 10) ---")
for feat, err in sorted(worst_reconstructed_tab, key=lambda x: x[1], reverse=True)[:10]:
print(f"Feature {feat} ({feature_names[feat]}): error = {err:.6f}, importance = {input_importance_tab[feat]}")
In [14]:
fig_tab, axes_tab = plt.subplots(1, 2, figsize=(16, 5))
top_n = 20
top_indices = np.argsort(input_importance_tab)[-top_n:][::-1]
top_names = [feature_names[i][:25] for i in top_indices]
axes_tab[0].barh(range(top_n), input_importance_tab[top_indices])
axes_tab[0].set_yticks(range(top_n))
axes_tab[0].set_yticklabels(top_names, fontsize=9)
axes_tab[0].set_xlabel('Usage Count in Latent Formulas')
axes_tab[0].set_title('Top 20 Important Features (Tabular)')
axes_tab[0].invert_yaxis()
sample_features_tab = np.random.choice(input_dim_tab, 10, replace=False)
for feat in sample_features_tab:
errors = [reconstruction_errors_tab[ld][feat] for ld in latent_dims_to_test_tab]
label = feature_names[feat][:20] if len(feature_names[feat]) > 20 else feature_names[feat]
axes_tab[1].plot(latent_dims_to_test_tab, errors, marker='o', alpha=0.6, label=label)
axes_tab[1].set_xlabel('Latent Dimension')
axes_tab[1].set_ylabel('Reconstruction Error')
axes_tab[1].set_title('Reconstruction Error vs Latent Dimension (Tabular, 10 sample features)')
axes_tab[1].set_xscale('log')
axes_tab[1].grid(True, alpha=0.3)
axes_tab[1].legend(fontsize=8, loc='best')
plt.tight_layout()
plt.show()
Text¶
In [15]:
torch.manual_seed(24)
np.random.seed(24)
dataset = load_dataset("imdb", split="train[:4096]")
texts = dataset['text']
model_st = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
embeddings = model_st.encode(texts, show_progress_bar=True, batch_size=32)
embeddings = torch.FloatTensor(embeddings)
print(f"Embeddings shape: {embeddings.shape}")
embeddings_mean = embeddings.mean(dim=0)
embeddings_std = embeddings.std(dim=0)
embeddings_normalized = (embeddings - embeddings_mean) / (embeddings_std + 1e-8)
train_size_text = int(0.8 * len(embeddings_normalized))
train_data_text = embeddings_normalized[:train_size_text]
test_data_text = embeddings_normalized[train_size_text:]
train_loader = DataLoader(TensorDataset(train_data_text), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(test_data_text), batch_size=32)
In [16]:
input_dim = embeddings_normalized.shape[1]
latent_dim = 32
print(f"\nTraining autoencoder ({input_dim} -> {latent_dim} -> {input_dim})...")
autoencoder = Autoencoder(input_dim, latent_dim)
autoencoder = train_autoencoder(autoencoder, train_loader, 80)
print("Extracting latent representations...")
autoencoder.eval()
with torch.no_grad():
all_inputs = train_data_text
all_recons, all_latents = autoencoder(all_inputs)
X_input = all_inputs.numpy()
Z_latent = all_latents.numpy()
X_recon = all_recons.numpy()
print(f"Input shape: {X_input.shape}, Latent shape: {Z_latent.shape}")
In [17]:
latent_formulas = {}
input_importance = np.zeros(input_dim)
print("Finding symbolic formulas for latent dimensions...")
for i in tqdm(range(min(8, latent_dim))):
model_sr = PySRRegressor(
niterations=64,
populations=45,
binary_operators=["+", "-", "*", "/", "^", "max", "min", "mod"],
unary_operators=["neg", "square", "cube", "cbrt", "sqrt", "abs", "sign", "inv", "exp", "log", "log10", "log2", "log1p", "sin", "cos", "tan", "asin", "acos", "atan", "sinh", "cosh", "tanh", "asinh", "acosh", "atanh", "relu", "sinc", "floor", "ceil"],
parsimony=0.01,
verbosity=0,
model_selection="best",
)
sample_idx = np.random.choice(len(X_input), min(1024, len(X_input)), replace=False)
X_sample = X_input[sample_idx]
z_sample = Z_latent[sample_idx, i]
model_sr.fit(X_sample, z_sample)
best_formula = str(model_sr.sympy())
latent_formulas[i] = best_formula
for j in range(input_dim):
if f'x{j}' in best_formula:
input_importance[j] += 1
In [18]:
print("--- Discovered Latent Formulas (sample) ---")
for i in range(min(5, len(latent_formulas))):
formula = latent_formulas[i]
if len(formula) > 100:
formula = formula[:97] + "..."
print(f"z_{i} = {formula}")
In [19]:
threshold = 0
discarded_dims = np.sort(np.where(input_importance == threshold)[0], kind="stable")
important_dims = np.where(input_importance > 2)[0][np.argsort(-input_importance[input_importance > 2], kind="stable")]
print("\n--- Input Dimension Usage Analysis ---")
print(f"Highly important dimensions (appear in >2 latents): {len(important_dims)}")
print(f"Examples: {important_dims[:10].tolist()}")
print(f"\nPotentially redundant dimensions (never appear): {len(discarded_dims)}")
print(f"Examples: {discarded_dims[:10].tolist()}")
In [20]:
latent_dims_to_test = [4, 8, 16, 32, 64]
reconstruction_errors = {}
print("Training autoencoders with different latent dimensions...")
for ld in latent_dims_to_test:
print(f"\nLatent dim = {ld}")
ae = Autoencoder(input_dim, ld)
ae = train_autoencoder(ae, train_loader, 80)
ae.eval()
with torch.no_grad():
recon, _ = ae(test_data_text)
errors = ((recon - test_data_text) ** 2).mean(dim=0).numpy()
reconstruction_errors[ld] = errors
error_data = []
for feature_j in range(input_dim):
for ld in latent_dims_to_test:
error_data.append({
'feature_id': feature_j,
'latent_dim': ld,
'reconstruction_error': reconstruction_errors[ld][feature_j]
})
df_errors = pd.DataFrame(error_data)
print("--- Reconstruction Error Analysis ---")
print(f"Total data points: {len(df_errors)}")
In [21]:
feature_properties = pd.DataFrame({
'feature_id': range(input_dim),
'mean': X_input.mean(axis=0),
'std': X_input.std(axis=0),
'importance': input_importance
})
df_merged = df_errors.merge(feature_properties, on='feature_id')
print("Finding symbolic law for reconstruction error...")
X_recon_sr = df_merged[['latent_dim', 'mean', 'std', 'importance']].values
y_recon_sr = df_merged['reconstruction_error'].values
model_recon_sr = PySRRegressor(
niterations=64,
populations=45,
binary_operators=["+", "-", "*", "/", "^", "max", "min", "mod"],
unary_operators=["neg", "square", "cube", "cbrt", "sqrt", "abs", "sign", "inv", "exp", "log", "log10", "log2", "log1p", "sin", "cos", "tan", "asin", "acos", "atan", "sinh", "cosh", "tanh", "asinh", "acosh", "atanh", "relu", "sinc", "floor", "ceil"],
parsimony=0.01,
verbosity=0,
model_selection="best",
)
sample_idx = np.random.choice(len(X_recon_sr), min(2048, len(X_recon_sr)), replace=False)
model_recon_sr.fit(X_recon_sr[sample_idx], y_recon_sr[sample_idx])
Out[21]:
In [22]:
print("\n--- Discovered Reconstruction Law ---")
print(f"reconstruction_error = {model_recon_sr.sympy()}")
print("Where:")
print(" x0 = latent_dim")
print(" x1 = feature_mean")
print(" x2 = feature_std")
print(" x3 = feature_importance")
best_reconstructed = []
worst_reconstructed = []
for feature_j in range(input_dim):
avg_error = df_errors[df_errors['feature_id'] == feature_j]['reconstruction_error'].mean()
if avg_error < df_errors['reconstruction_error'].quantile(0.1):
best_reconstructed.append((feature_j, avg_error))
elif avg_error > df_errors['reconstruction_error'].quantile(0.9):
worst_reconstructed.append((feature_j, avg_error))
print("\n--- Best Reconstructed Features (top 10) ---")
for feat, err in sorted(best_reconstructed, key=lambda x: x[1])[:10]:
print(f"Feature {feat}: error = {err:.6f}, importance = {input_importance[feat]}")
print("\n--- Worst Reconstructed Features (top 10) ---")
for feat, err in sorted(worst_reconstructed, key=lambda x: x[1], reverse=True)[:10]:
print(f"Feature {feat}: error = {err:.6f}, importance = {input_importance[feat]}")
In [23]:
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].bar(range(min(50, input_dim)), input_importance[:50])
axes[0].set_xlabel('Input Dimension')
axes[0].set_ylabel('Usage Count in Latent Formulas')
axes[0].set_title('Input Dimension Importance')
axes[0].axhline(y=2, color='r', linestyle='--', label='High importance threshold')
axes[0].legend()
sample_features = np.random.choice(input_dim, 10, replace=False)
for feat in sample_features:
errors = [reconstruction_errors[ld][feat] for ld in latent_dims_to_test]
axes[1].plot(latent_dims_to_test, errors, marker='o', alpha=0.6)
axes[1].set_xlabel('Latent Dimension')
axes[1].set_ylabel('Reconstruction Error')
axes[1].set_title('Reconstruction Error vs Latent Dimension (10 sample features)')
axes[1].set_xscale('log')
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()