Source code for kmcpy.models.fitting.fitter

#!/usr/bin/env python
"""Model fitting implementations."""

from abc import ABC, abstractmethod
import logging
import os

from monty.json import MSONable
from monty.serialization import dumpfn, loadfn
from kmcpy.models.parameters import LCEModelParamHistory, LCEModelParameters

logger = logging.getLogger(__name__)


[docs] class BaseFitter(MSONable, ABC): """Main class for model fitting""" def __init__(self) -> None: pass
[docs] @abstractmethod def as_dict(self): """Convert the fitting object to a dictionary""" raise NotImplementedError( "This method should be implemented in the subclass to convert the fitting object to a dictionary." )
[docs] @abstractmethod def fit(self, *args, **kwargs): """Fit the model to the data""" raise NotImplementedError( "This method should be implemented in the subclass to fit the model to the data." )
[docs] class LCEFitter(BaseFitter): """Fitting class for Local Cluster Expansion (LCE) model""" def __init__(self) -> None: """ Initialize the LCEFitter object. """ super().__init__() self.model_parameters = LCEModelParameters( keci=[], empty_cluster=0.0, cluster_site_indices=[], weight=[], alpha=0.0, time_stamp="", time="", rmse=0.0, loocv=0.0, normalize=True, ) @staticmethod def _extract_serialized_record(payload: dict) -> dict: """Extract one fitting record from supported serialized payloads.""" if "keci" in payload: return payload # Legacy "orient=index" payload used by historical fitting outputs. numeric_records = [] for key, value in payload.items(): if not isinstance(value, dict): continue try: numeric_records.append((int(key), value)) except (TypeError, ValueError): continue if numeric_records: return max(numeric_records, key=lambda item: item[0])[1] raise ValueError("Unsupported serialized fitter schema.") @staticmethod def _save_fit_results_history( model_parameters: LCEModelParameters, fit_results_fname: str ) -> None: import pandas as pd required_columns = [ "time_stamp", "time", "keci", "empty_cluster", "weight", "alpha", "rmse", "loocv", ] columns = required_columns + [ "normalize", "orbit_fingerprints", "local_environment_hash", ] row = [ model_parameters.time_stamp, model_parameters.time, model_parameters.keci, model_parameters.empty_cluster, model_parameters.weight, model_parameters.alpha, model_parameters.rmse, model_parameters.loocv, model_parameters.normalize, model_parameters.orbit_fingerprints, model_parameters.local_environment_hash, ] try: logger.info("Try loading %s ...", fit_results_fname) df = pd.read_json(fit_results_fname, orient="index") if not set(required_columns).issubset(df.columns): raise ValueError("Unexpected file schema") if "normalize" not in df.columns: df["normalize"] = True if "orbit_fingerprints" not in df.columns: df["orbit_fingerprints"] = None if "local_environment_hash" not in df.columns: df["local_environment_hash"] = None new_data = pd.DataFrame([row], columns=columns) df2 = pd.concat([df[columns], new_data], ignore_index=True) df2.to_json(fit_results_fname, orient="index", indent=4) logger.info("Updated latest results:\n%s", df2.iloc[-1]) except Exception: logger.info( "%s is not found or incompatible, create a new file...", fit_results_fname, ) df = pd.DataFrame([row], columns=columns) df.to_json(fit_results_fname, orient="index", indent=4) logger.info("Updated latest results:\n%s", df.iloc[-1])
[docs] def as_dict(self): d = { "@module": self.__class__.__module__, "@class": self.__class__.__name__, "time_stamp": self.model_parameters.time_stamp, "time": self.model_parameters.time, "weight": self.model_parameters.weight, "alpha": self.model_parameters.alpha, "keci": self.model_parameters.keci, "empty_cluster": self.model_parameters.empty_cluster, "rmse": self.model_parameters.rmse, "loocv": self.model_parameters.loocv, "normalize": self.model_parameters.normalize, } if self.model_parameters.orbit_fingerprints is not None: d["orbit_fingerprints"] = self.model_parameters.orbit_fingerprints if self.model_parameters.local_environment_hash is not None: d["local_environment_hash"] = self.model_parameters.local_environment_hash return d
def to(self, filename): logger.info("Saving: %s", filename) dumpfn(self.as_dict(), filename, indent=4)
[docs] @classmethod def from_dict(cls, payload): if not isinstance(payload, dict): raise ValueError("Serialized fitter payload must be a JSON object.") record = cls._extract_serialized_record(payload) obj = cls() obj.model_parameters = LCEModelParameters.from_dict(record) return obj
@classmethod def from_file(cls, filename): logger.info("Loading: %s", filename) return cls.from_dict(loadfn(filename, cls=None)) @staticmethod def _fit_lasso_values( correlation_matrix, e_kra, weight, alpha, max_iter, normalize, ): """Fit Lasso and return unscaled coefficients, intercept, and predictions.""" from sklearn.linear_model import Lasso import numpy as np x = np.asarray(correlation_matrix, dtype=float) y = np.asarray(e_kra, dtype=float) sample_weight = None if weight is None else np.asarray(weight, dtype=float) if not normalize: estimator = Lasso(alpha=alpha, max_iter=max_iter, fit_intercept=True) fit_kwargs = ( {} if sample_weight is None else {"sample_weight": sample_weight} ) estimator.fit(x, y, **fit_kwargs) keci = estimator.coef_ empty_cluster = estimator.intercept_ return keci, empty_cluster, estimator.predict(x) if sample_weight is None: x_offset = np.mean(x, axis=0) y_offset = float(np.mean(y)) else: x_offset = np.average(x, axis=0, weights=sample_weight) y_offset = float(np.average(y, weights=sample_weight)) x_centered = x - x_offset y_centered = y - y_offset # Mirror the deprecated sklearn normalize=True path used by the # historical fitting workflow: center first, then divide each feature # by its unweighted L2 norm. x_scale = np.sqrt(np.sum(x_centered ** 2, axis=0)) x_scale[x_scale == 0.0] = 1.0 x_normalized = x_centered / x_scale estimator = Lasso(alpha=alpha, max_iter=max_iter, fit_intercept=False) fit_kwargs = ( {} if sample_weight is None else {"sample_weight": sample_weight} ) estimator.fit(x_normalized, y_centered, **fit_kwargs) keci = estimator.coef_ / x_scale empty_cluster = y_offset - float(np.dot(x_offset, keci)) y_pred = x @ keci + empty_cluster return keci, empty_cluster, y_pred @classmethod def _leave_one_out_rmse( cls, correlation_matrix, e_kra, alpha, max_iter, normalize, ) -> float: """Compute the historical unweighted leave-one-out RMSE.""" import numpy as np x = np.asarray(correlation_matrix, dtype=float) y = np.asarray(e_kra, dtype=float) squared_errors = [] for excluded_index in range(len(y)): train_mask = np.ones(len(y), dtype=bool) train_mask[excluded_index] = False keci, empty_cluster, _ = cls._fit_lasso_values( x[train_mask], y[train_mask], weight=None, alpha=alpha, max_iter=max_iter, normalize=normalize, ) y_pred = float(x[excluded_index] @ keci + empty_cluster) squared_errors.append((float(y[excluded_index]) - y_pred) ** 2) return float(np.sqrt(np.mean(squared_errors)))
[docs] def fit( self, alpha, max_iter=1000000, ekra_fname="e_kra.txt", keci_fname="keci.txt", weight_fname="weight.txt", corr_fname="correlation_matrix.txt", lce_params_fname="lce_params.json", lce_params_history_fname="lce_params_history.json", fit_results_fname=None, normalize=True, orbit_fingerprints=None, local_environment_hash=None, ) -> tuple[LCEModelParameters, object, object]: """Main fitting function Args: alpha (float): Alpha value for Lasso regression max_iter (int, optional): Maximum number of iterations. Defaults to 1000000. ekra_fname (str, optional): File name for E_KRA storage. Defaults to 'e_kra.txt'. keci_fname (str, optional): File name for KECI storage. Defaults to 'keci.txt'. weight_fname (str, optional): File name for weight storage. Defaults to 'weight.txt'. corr_fname (str, optional): File name for correlation matrix storage. Defaults to 'correlation_matrix.txt'. lce_params_fname (str, optional): File name for LCE parameters storage. If None, skip saving. lce_params_history_fname (str, optional): File name for LCE parameters history storage. Defaults to 'lce_params_history.json'. fit_results_fname (str | None, optional): Legacy fitting history file in orient=index JSON format. If None, skip writing. orbit_fingerprints (list[str] | None, optional): Orbit fingerprints associated with the fitted coefficient order. local_environment_hash (str | None, optional): Hash of the ordered local environment used to validate fitted parameters later. Returns: tuple[LCEModelParameters, numpy.ndarray, numpy.ndarray]: Fitted model parameters, predicted E_KRA, and DFT-computed E_KRA. """ """ E_KRA[m x 1] = diagonal(Weight)[m x m] * Corr[m x n] * ECI[n x 1] + V_0[m x 1] E_KRA is a m x 1 vector ECI is a n x 1 vector Corr is a m x n matrix V_0 is a m x 1 vector Weight is a n x n diagonal matrix m is the number of E_KRA n is the number of clusers """ from sklearn.metrics import root_mean_squared_error from copy import copy from datetime import datetime import numpy as np logger.info("Loading E_KRA from %s ...", ekra_fname) e_kra = np.loadtxt(ekra_fname) weight = np.loadtxt(weight_fname) weight_copy = copy(weight) correlation_matrix = np.loadtxt(corr_fname) keci, empty_cluster, y_pred = self._fit_lasso_values( correlation_matrix=correlation_matrix, e_kra=e_kra, weight=weight, alpha=alpha, max_iter=max_iter, normalize=normalize, ) if orbit_fingerprints is not None and len(keci) != len(orbit_fingerprints): raise ValueError( "orbit_fingerprints length does not match fitted keci length: " f"{len(orbit_fingerprints)} != {len(keci)}" ) logger.info("Lasso Results:") logger.info("KECI = \n%s", np.round(keci, 2)) logger.info( "There are %s Non Zero KECI", np.count_nonzero(abs(keci) > 1e-2) ) logger.info("Empty Cluster = %s", empty_cluster) y_true = e_kra logger.info("index\tNEB\tLCE\tNEB-LCE") index = np.linspace(1, len(y_true), num=len(y_true), dtype="int") logger.info( "\n%s", np.round(np.array([index, y_true, y_pred, y_true - y_pred]).T, decimals=2), ) # cv = sqrt(mean(scores)) + N_nonzero_eci*penalty, penalty = 0 here loocv = self._leave_one_out_rmse( correlation_matrix=correlation_matrix, e_kra=e_kra, alpha=alpha, max_iter=max_iter, normalize=normalize, ) logger.info("LOOCV = %s meV", np.round(loocv, 2)) # compute RMS error rmse = root_mean_squared_error(y_true, y_pred) logger.info("RMSE = %s meV", np.round(rmse, 2)) np.savetxt(fname=keci_fname, X=keci, fmt="%.8f") now = datetime.now() time_stamp = now.timestamp() time = now.strftime("%m/%d/%Y, %H:%M:%S") lce_model_params = LCEModelParameters( keci=keci.tolist(), empty_cluster=empty_cluster, cluster_site_indices=[], weight=weight_copy.tolist(), alpha=alpha, time_stamp=time_stamp, time=time, rmse=rmse, loocv=loocv, normalize=normalize, orbit_fingerprints=orbit_fingerprints, local_environment_hash=local_environment_hash, ) self.model_parameters = lce_model_params if fit_results_fname: self._save_fit_results_history(lce_model_params, fit_results_fname) if lce_params_fname: logger.info("Saving LCE model parameters to %s ...", lce_params_fname) lce_model_params.to(lce_params_fname) if lce_params_history_fname: try: logger.info( "Saving LCE model parameters history to %s ...", lce_params_history_fname, ) if os.path.exists(lce_params_history_fname): logger.info("Loading LCE model parameters history ...") lce_model_param_history = LCEModelParamHistory.from_file( lce_params_history_fname ) lce_model_param_history.append(lce_model_params) lce_model_param_history.to(lce_params_history_fname) else: logger.info("Creating new LCE model parameters history ...") lce_model_param_history = LCEModelParamHistory() lce_model_param_history.append(lce_model_params) lce_model_param_history.to(lce_params_history_fname) except Exception as e: logger.error("Error saving LCE model parameters history: %s", e) raise e return lce_model_params, y_pred, y_true