| """ |
| Stage 3: Concept-Guided Embeddings |
| |
| Project programs into a space where concept dimensions are explicit and interpretable. |
| Two approaches: |
| |
| 1. ConceptBottleneckAE (CB-SAE, 2512.10805): |
| Encoder maps program → concept scores; decoder reconstructs from concept scores. |
| Each bottleneck dimension = a named concept. |
| |
| 2. GCAVEmbedding (GCAV, 2501.05764): |
| For each concept, train a linear classifier on LLM hidden states. |
| The concept activation vector = classifier normal direction. |
| Steering: e' = e + ε·v_concept |
| |
| Verification: |
| - t-SNE/UMAP visualization |
| - DA@K in concept space vs raw embedding space |
| - AlgoSim label prediction from concept-space distances |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from dataclasses import dataclass, field |
| from typing import Any, Optional |
|
|
| import numpy as np |
|
|
| from reason_first_program.program_space import Program, ProgramSpace |
| from reason_first_program.concepts import Concept, ConceptSet |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class ConceptBottleneckAE: |
| """ |
| Concept Bottleneck Autoencoder. |
| |
| Architecture (from CB-SAE, 2512.10805): |
| Encoder: program_features → concept_scores (|C| dimensions) |
| Decoder: concept_scores → reconstructed_features |
| |
| The bottleneck forces the representation to go through named concept |
| dimensions, making each axis interpretable. |
| |
| Training: |
| L = L_recon + λ_concept * L_concept_supervision + λ_sparse * L_sparsity |
| """ |
|
|
| def __init__( |
| self, |
| n_concepts: int, |
| input_dim: int, |
| hidden_dim: int = 256, |
| sparsity_weight: float = 0.01, |
| concept_supervision_weight: float = 1.0, |
| learning_rate: float = 1e-3, |
| n_epochs: int = 100, |
| ): |
| self.n_concepts = n_concepts |
| self.input_dim = input_dim |
| self.hidden_dim = hidden_dim |
| self.sparsity_weight = sparsity_weight |
| self.concept_supervision_weight = concept_supervision_weight |
| self.learning_rate = learning_rate |
| self.n_epochs = n_epochs |
|
|
| |
| self.encoder_weights: Optional[np.ndarray] = None |
| self.decoder_weights: Optional[np.ndarray] = None |
| self.concept_names: list[str] = [] |
|
|
| def train( |
| self, |
| features: np.ndarray, |
| concept_labels: np.ndarray, |
| concept_names: list[str], |
| ) -> dict[str, float]: |
| """ |
| Train the concept bottleneck autoencoder. |
| |
| Args: |
| features: (n_programs, input_dim) - program feature vectors |
| concept_labels: (n_programs, n_concepts) - concept supervision labels |
| concept_names: list of concept names for each bottleneck dimension |
| |
| Returns: |
| Training metrics dict |
| """ |
| try: |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| except ImportError: |
| return self._train_numpy(features, concept_labels, concept_names) |
|
|
| self.concept_names = concept_names |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| |
| encoder = nn.Sequential( |
| nn.Linear(self.input_dim, self.hidden_dim), |
| nn.ReLU(), |
| nn.Linear(self.hidden_dim, self.n_concepts), |
| nn.Sigmoid(), |
| ).to(device) |
|
|
| decoder = nn.Sequential( |
| nn.Linear(self.n_concepts, self.hidden_dim), |
| nn.ReLU(), |
| nn.Linear(self.hidden_dim, self.input_dim), |
| ).to(device) |
|
|
| optimizer = optim.Adam( |
| list(encoder.parameters()) + list(decoder.parameters()), |
| lr=self.learning_rate, |
| ) |
|
|
| X = torch.tensor(features, dtype=torch.float32, device=device) |
| Y = torch.tensor(concept_labels, dtype=torch.float32, device=device) |
|
|
| losses = [] |
| for epoch in range(self.n_epochs): |
| concept_scores = encoder(X) |
| reconstructed = decoder(concept_scores) |
|
|
| |
| l_recon = ((X - reconstructed) ** 2).mean() |
|
|
| |
| l_concept = nn.functional.binary_cross_entropy(concept_scores, Y) |
|
|
| |
| l_sparse = concept_scores.abs().mean() |
|
|
| loss = ( |
| l_recon |
| + self.concept_supervision_weight * l_concept |
| + self.sparsity_weight * l_sparse |
| ) |
|
|
| optimizer.zero_grad() |
| loss.backward() |
| optimizer.step() |
|
|
| if epoch % 20 == 0: |
| logger.info( |
| f"CB-AE epoch {epoch}: loss={loss.item():.4f} " |
| f"recon={l_recon.item():.4f} concept={l_concept.item():.4f}" |
| ) |
| losses.append(loss.item()) |
|
|
| |
| self.encoder_weights = { |
| k: v.cpu().detach().numpy() |
| for k, v in encoder.state_dict().items() |
| } |
| self.decoder_weights = { |
| k: v.cpu().detach().numpy() |
| for k, v in decoder.state_dict().items() |
| } |
| self._encoder = encoder |
| self._decoder = decoder |
|
|
| return { |
| "final_loss": losses[-1], |
| "final_recon_loss": l_recon.item(), |
| "final_concept_loss": l_concept.item(), |
| } |
|
|
| def _train_numpy( |
| self, |
| features: np.ndarray, |
| concept_labels: np.ndarray, |
| concept_names: list[str], |
| ) -> dict[str, float]: |
| """Fallback numpy-only training (simple linear model).""" |
| self.concept_names = concept_names |
| n, d = features.shape |
| k = self.n_concepts |
|
|
| |
| |
| W_enc, _, _, _ = np.linalg.lstsq(features, concept_labels, rcond=None) |
| self.encoder_weights = {"linear": W_enc.T} |
|
|
| |
| concept_scores = features @ W_enc |
| W_dec, _, _, _ = np.linalg.lstsq(concept_scores, features, rcond=None) |
| self.decoder_weights = {"linear": W_dec.T} |
|
|
| recon = concept_scores @ W_dec |
| recon_loss = float(np.mean((features - recon) ** 2)) |
|
|
| return {"final_loss": recon_loss, "method": "numpy_linear"} |
|
|
| def encode(self, features: np.ndarray) -> np.ndarray: |
| """ |
| Encode programs into concept space. |
| Returns (n_programs, n_concepts) concept scores. |
| """ |
| if hasattr(self, "_encoder"): |
| import torch |
| device = next(self._encoder.parameters()).device |
| X = torch.tensor(features, dtype=torch.float32, device=device) |
| with torch.no_grad(): |
| scores = self._encoder(X).cpu().numpy() |
| return scores |
| elif self.encoder_weights is not None and "linear" in self.encoder_weights: |
| return features @ self.encoder_weights["linear"].T |
| else: |
| raise RuntimeError("Model not trained yet") |
|
|
| def decode(self, concept_scores: np.ndarray) -> np.ndarray: |
| """Decode from concept space back to feature space.""" |
| if hasattr(self, "_decoder"): |
| import torch |
| device = next(self._decoder.parameters()).device |
| Z = torch.tensor(concept_scores, dtype=torch.float32, device=device) |
| with torch.no_grad(): |
| features = self._decoder(Z).cpu().numpy() |
| return features |
| elif self.decoder_weights is not None and "linear" in self.decoder_weights: |
| return concept_scores @ self.decoder_weights["linear"].T |
| else: |
| raise RuntimeError("Model not trained yet") |
|
|
|
|
| class GCAVEmbedding: |
| """ |
| Concept Activation Vector embedding. |
| Based on GCAV (2501.05764): for each concept, the CAV is the normal direction |
| of a logistic classifier that separates concept-positive from concept-negative |
| activations. |
| |
| For concept d at layer l: |
| P_d^(l)(e) = sigmoid(w_d^(l)^T · e + b_d^(l)) |
| v_d^(l) = w_d^(l) / ||w_d^(l)|| |
| |
| Steering: |
| e' = e + ε · v_concept |
| """ |
|
|
| def __init__(self): |
| self.concept_vectors: dict[str, np.ndarray] = {} |
| self.concept_biases: dict[str, float] = {} |
| self.concept_classifiers: dict[str, Any] = {} |
|
|
| def train_concept_vector( |
| self, |
| concept_name: str, |
| positive_features: np.ndarray, |
| negative_features: np.ndarray, |
| ) -> dict[str, float]: |
| """ |
| Train a concept activation vector from contrastive data. |
| |
| Args: |
| concept_name: Name of the concept |
| positive_features: Features of programs exhibiting the concept |
| negative_features: Features of programs NOT exhibiting the concept |
| |
| Returns: |
| Training metrics |
| """ |
| from sklearn.linear_model import LogisticRegression |
|
|
| X = np.vstack([positive_features, negative_features]) |
| y = np.array([1] * len(positive_features) + [0] * len(negative_features)) |
|
|
| clf = LogisticRegression(max_iter=1000, solver="lbfgs") |
| clf.fit(X, y) |
|
|
| |
| w = clf.coef_[0] |
| v = w / (np.linalg.norm(w) + 1e-8) |
|
|
| self.concept_vectors[concept_name] = v |
| self.concept_biases[concept_name] = float(clf.intercept_[0]) |
| self.concept_classifiers[concept_name] = clf |
|
|
| accuracy = clf.score(X, y) |
| return { |
| "accuracy": accuracy, |
| "concept": concept_name, |
| "vector_norm": float(np.linalg.norm(w)), |
| } |
|
|
| def train_all( |
| self, |
| concept_set: ConceptSet, |
| features: np.ndarray, |
| programs: list[Program], |
| ) -> dict[str, dict[str, float]]: |
| """ |
| Train CAVs for all concepts in a concept set. |
| |
| Args: |
| concept_set: Set of discovered concepts |
| features: (n_programs, dim) feature vectors |
| programs: List of programs (same order as features) |
| """ |
| program_id_to_idx = {p.program_id: i for i, p in enumerate(programs)} |
| results = {} |
|
|
| for concept in concept_set.concepts: |
| pos_idx = [ |
| program_id_to_idx[pid] |
| for pid in concept.programs |
| if pid in program_id_to_idx |
| ] |
| neg_idx = [ |
| i for i in range(len(programs)) |
| if programs[i].program_id not in concept.programs |
| ] |
|
|
| if len(pos_idx) < 2 or len(neg_idx) < 2: |
| logger.warning( |
| f"Skipping concept '{concept.name}': insufficient samples " |
| f"(pos={len(pos_idx)}, neg={len(neg_idx)})" |
| ) |
| continue |
|
|
| pos_features = features[pos_idx] |
| neg_features = features[neg_idx] |
|
|
| results[concept.name] = self.train_concept_vector( |
| concept.name, pos_features, neg_features |
| ) |
|
|
| logger.info(f"Trained {len(results)} concept activation vectors") |
| return results |
|
|
| def project(self, features: np.ndarray) -> np.ndarray: |
| """ |
| Project features into concept space. |
| Each dimension = dot product with concept activation vector. |
| Returns (n_samples, n_concepts). |
| """ |
| if not self.concept_vectors: |
| raise RuntimeError("No concept vectors trained") |
|
|
| vectors = np.array(list(self.concept_vectors.values())) |
| return features @ vectors.T |
|
|
| def steer( |
| self, |
| features: np.ndarray, |
| concept_name: str, |
| strength: float = 1.0, |
| ) -> np.ndarray: |
| """ |
| Steer features toward (or away from) a concept. |
| Implements Eq. 3 from GCAV: e' = e + ε · v_concept |
| |
| Args: |
| features: (n, dim) or (dim,) feature vector(s) |
| concept_name: Which concept to steer toward |
| strength: ε — positive = toward, negative = away |
| |
| Returns: |
| Steered features |
| """ |
| if concept_name not in self.concept_vectors: |
| raise ValueError(f"Unknown concept: {concept_name}") |
|
|
| v = self.concept_vectors[concept_name] |
| return features + strength * v |
|
|
| def multi_steer( |
| self, |
| features: np.ndarray, |
| concept_weights: dict[str, float], |
| ) -> np.ndarray: |
| """ |
| Steer features along multiple concept dimensions simultaneously. |
| |
| Simple additive steering (may cause interference — see MSRS for |
| orthogonal approach). |
| |
| Args: |
| features: Feature vector(s) |
| concept_weights: {concept_name: strength} |
| """ |
| steered = features.copy() |
| for concept_name, weight in concept_weights.items(): |
| if concept_name in self.concept_vectors: |
| steered = steered + weight * self.concept_vectors[concept_name] |
| return steered |
|
|
|
|
| class MSRSSteering: |
| """ |
| Multi-Subspace Representation Steering (MSRS, 2508.10599). |
| |
| Addresses concept interference by assigning orthogonal subspaces to each |
| concept. Key components: |
| |
| 1. Shared subspace B_shared: captures common directions across all concepts |
| 2. Private subspaces B_i: concept-specific orthogonal directions |
| 3. Adaptive mask m(h): learns to weight subspace dimensions |
| |
| Intervention: Φ(h; R, W, b, m) = h + R^T · diag(m(h)) · (Wh + b - Rh) |
| """ |
|
|
| def __init__(self, energy_threshold: float = 0.6): |
| self.energy_threshold = energy_threshold |
| self.B_shared: Optional[np.ndarray] = None |
| self.B_private: dict[str, np.ndarray] = {} |
| self.S_align: Optional[np.ndarray] = None |
|
|
| def fit( |
| self, |
| concept_features: dict[str, np.ndarray], |
| ) -> dict[str, Any]: |
| """ |
| Extract shared and private subspaces for each concept. |
| |
| Args: |
| concept_features: {concept_name: (n_samples, dim) features} |
| """ |
| |
| means = {} |
| for name, features in concept_features.items(): |
| means[name] = features.mean(axis=0) |
|
|
| |
| concept_names = list(means.keys()) |
| tau_c = np.column_stack([means[name] for name in concept_names]) |
|
|
| |
| U, S, Vt = np.linalg.svd(tau_c, full_matrices=False) |
| cumulative_energy = np.cumsum(S) / S.sum() |
| r_s = int(np.searchsorted(cumulative_energy, self.energy_threshold) + 1) |
| r_s = max(1, min(r_s, len(S))) |
|
|
| self.B_shared = Vt[:r_s] |
|
|
| |
| self.B_private = {} |
| for name, mean_act in means.items(): |
| |
| residual = mean_act - self.B_shared.T @ (self.B_shared @ mean_act) |
|
|
| if np.linalg.norm(residual) > 1e-8: |
| |
| residual_norm = residual / np.linalg.norm(residual) |
| self.B_private[name] = residual_norm.reshape(1, -1) |
|
|
| |
| components = [self.B_shared] |
| for name in concept_names: |
| if name in self.B_private: |
| components.append(self.B_private[name]) |
|
|
| self.S_align = np.vstack(components) |
|
|
| return { |
| "shared_rank": r_s, |
| "n_concepts": len(concept_names), |
| "private_dims": { |
| name: B.shape[0] for name, B in self.B_private.items() |
| }, |
| "total_dims": self.S_align.shape[0], |
| } |
|
|
| def steer( |
| self, |
| features: np.ndarray, |
| concept_weights: dict[str, float], |
| ) -> np.ndarray: |
| """ |
| Steer features using orthogonal subspace decomposition. |
| |
| Applies steering in each concept's private subspace independently, |
| then adds shared-subspace steering. This prevents interference. |
| """ |
| if self.B_shared is None: |
| raise RuntimeError("MSRS not fitted yet") |
|
|
| steered = features.copy() |
|
|
| |
| total_weight = sum(abs(w) for w in concept_weights.values()) |
| if total_weight > 0: |
| shared_direction = np.zeros(features.shape[-1]) |
| for name, weight in concept_weights.items(): |
| if name in self.B_private: |
| private = self.B_private[name] |
| shared_direction += weight * private[0] |
|
|
| steered = steered + shared_direction |
|
|
| return steered |
|
|
| def project(self, features: np.ndarray) -> np.ndarray: |
| """Project features into the aligned subspace.""" |
| if self.S_align is None: |
| raise RuntimeError("MSRS not fitted yet") |
| return features @ self.S_align.T |
|
|
|
|
| class ConceptEmbeddingSpace: |
| """ |
| Unified embedding space that combines CB-AE and GCAV approaches. |
| |
| Provides: |
| - Program projection into concept space |
| - Visualization (t-SNE / UMAP) |
| - Alignment verification |
| - Steering interface |
| """ |
|
|
| def __init__( |
| self, |
| concept_set: ConceptSet, |
| cbae: Optional[ConceptBottleneckAE] = None, |
| gcav: Optional[GCAVEmbedding] = None, |
| msrs: Optional[MSRSSteering] = None, |
| ): |
| self.concept_set = concept_set |
| self.cbae = cbae |
| self.gcav = gcav |
| self.msrs = msrs |
|
|
| def project( |
| self, |
| programs: list[Program], |
| method: str = "concept_scores", |
| ) -> np.ndarray: |
| """ |
| Project programs into concept space. |
| |
| Args: |
| programs: Programs to project |
| method: 'concept_scores' (direct scoring), 'cbae', 'gcav' |
| |
| Returns: |
| (n_programs, n_concepts) projection |
| """ |
| if method == "concept_scores": |
| return self.concept_set.score_matrix(programs) |
| elif method == "cbae" and self.cbae is not None: |
| raise NotImplementedError("Need features extraction") |
| elif method == "gcav" and self.gcav is not None: |
| raise NotImplementedError("Need features extraction") |
| else: |
| return self.concept_set.score_matrix(programs) |
|
|
| def verify_alignment( |
| self, |
| programs: list[Program], |
| ground_truth_clusters: Optional[list[list[int]]] = None, |
| ) -> dict[str, float]: |
| """ |
| Verify that concept-space projection aligns with meaningful differences. |
| |
| Checks: |
| 1. Concept scores discriminate between functional clusters |
| 2. Silhouette score in concept space |
| 3. Concept dimensions are not redundant (low correlation) |
| """ |
| projection = self.project(programs) |
|
|
| results: dict[str, float] = {} |
|
|
| |
| if projection.shape[1] > 1: |
| corr_matrix = np.corrcoef(projection.T) |
| |
| n = corr_matrix.shape[0] |
| mask = ~np.eye(n, dtype=bool) |
| avg_correlation = np.abs(corr_matrix[mask]).mean() |
| results["avg_concept_correlation"] = float(avg_correlation) |
| results["n_concepts"] = n |
|
|
| |
| coverage = (projection > 0).mean(axis=0) |
| results["mean_concept_coverage"] = float(coverage.mean()) |
| results["min_concept_coverage"] = float(coverage.min()) |
| results["max_concept_coverage"] = float(coverage.max()) |
|
|
| |
| variance_explained = projection.var(axis=0) |
| total_var = variance_explained.sum() |
| if total_var > 0: |
| normalized_var = variance_explained / total_var |
| effective_dim = float(np.exp(-np.sum( |
| normalized_var * np.log(normalized_var + 1e-10) |
| ))) |
| results["effective_dimensionality"] = effective_dim |
|
|
| return results |
|
|
| def visualize_2d( |
| self, |
| programs: list[Program], |
| method: str = "tsne", |
| color_by: str = "functional_cluster", |
| ) -> dict[str, Any]: |
| """ |
| Generate 2D visualization data for the concept space. |
| |
| Returns coordinates and metadata suitable for plotting. |
| """ |
| projection = self.project(programs) |
|
|
| if method == "tsne": |
| from sklearn.manifold import TSNE |
| reducer = TSNE(n_components=2, random_state=42, perplexity=min(30, len(programs) - 1)) |
| elif method == "umap": |
| try: |
| from umap import UMAP |
| reducer = UMAP(n_components=2, random_state=42) |
| except ImportError: |
| from sklearn.manifold import TSNE |
| reducer = TSNE(n_components=2, random_state=42, perplexity=min(30, len(programs) - 1)) |
| elif method == "pca": |
| from sklearn.decomposition import PCA |
| reducer = PCA(n_components=2) |
| else: |
| raise ValueError(f"Unknown method: {method}") |
|
|
| coords_2d = reducer.fit_transform(projection) |
|
|
| |
| colors = [] |
| if color_by == "functional_cluster": |
| sig_to_color = {} |
| color_idx = 0 |
| for p in programs: |
| sig = p.functional_signature |
| if sig not in sig_to_color: |
| sig_to_color[sig] = color_idx |
| color_idx += 1 |
| colors.append(sig_to_color[sig]) |
| elif color_by == "model": |
| model_to_color = {} |
| color_idx = 0 |
| for p in programs: |
| if p.model_id not in model_to_color: |
| model_to_color[p.model_id] = color_idx |
| color_idx += 1 |
| colors.append(model_to_color[p.model_id]) |
|
|
| return { |
| "x": coords_2d[:, 0].tolist(), |
| "y": coords_2d[:, 1].tolist(), |
| "colors": colors, |
| "program_ids": [p.program_id for p in programs], |
| "concept_names": self.concept_set.names, |
| "method": method, |
| } |
|
|