diff --git a/models/evaluate.py b/models/evaluate.py index 19019ce28067916e607b8fd13f13318db5a23321..75cd094fc1701535b8541aa03aa989370edbb9a6 100644 --- a/models/evaluate.py +++ b/models/evaluate.py @@ -1,9 +1,5 @@ -from os import makedirs, path as osp import numpy as np from tools.progress import Progbar -from tools.utils import to_list -from matplotlib import pyplot as plt -from aiact.models import metrics class Predictor: @@ -89,165 +85,3 @@ class Predictor: """ return self.predict(model, dataset, batch_size) - - -class ExperimentSettings(): - - def __init__(self, **kwargs): - pass - - -# SWGO = ExperimentSettings({"Energy": [u.TeV], "Zenith": [], "Azimuth": []}) - - -class Plotter(): - def __init__(self, plt_fn, figsize=(11, 9), log_dir="./", name="", **plt_kwargs): - self.figsize = figsize - self.log_dir = log_dir - self.fig, self.ax = plt.subplots(1, figsize=figsize) - self.plt_fn = plt_fn - self.name = name - - # def plot(self, metric, dset, xerr=None, yerr=None, statistics=None): - # args = metric(dset) - # self.plt_fn(xdata, ydata, xerr, yerr) - - def plot(self, *args, **plt_kwargs): - self.plt_fn(*args, axes=self.ax, **plt_kwargs) - self.ax.legend() - - def add_data(self, *args, **plt_kwargs): - self.plot(args, **plt_kwargs) - - def save(self, log_dir=None): - log_dir = log_dir if log_dir is not None else self.log_dir - self.fig.tight_layout() - self.fig.savefig(osp.join(log_dir, self.name + ".pdf"), dpi=100) - plt.close(self.fig) - - def add_statbox(self, stats): - from matplotlib.offsetbox import AnchoredText - - if self.plt_fn == plt.hist: - loc = "upper right" - elif self.plt_fn == plt.scatter or self.plt_fn == plt.hexbin: - loc = "lower right" - else: - loc = "upper left" - - anc_text = AnchoredText(stats, loc="upper right") - self.ax.add_artist(anc_text) - - -class Evaluator(): - """ Basic class of the evaluation of supervised neural networks. - - Parameters - ---------- - model : trained DNN model - trained model (Keras / Torch / or PyG model) - data : list - list of DataContainers - tasks : dict - Dictionary of tasks specifying the reconstruction {"primary": "classification", "energy": "regression"} - experiment : str - Should special - - Returns - ------- - type - Description of returned object. - - """ - - def __init__(self, model, data, tasks, log_dir="./", class_kwarg=None, experiment=None): - - self.model = model - self.data = to_list(data) - self.tasks = tasks - self.experiment = experiment - self.figsize = (11, 9) - self.class_kwarg = class_kwarg - self.plotter = {} - self.log_dir = log_dir - - def evaluate(self, tasks=None): - tasks = to_list(tasks) if tasks is not None else self.tasks.keys() - tasks = {task: t_type for task, t_type in self.tasks.items() if task in tasks} - from IPython import embed - embed() - - for task, t_type in tasks.items(): - assert task in self.tasks, "task %s not in self.tasks %s" % (task, list(self.tasks.keys())) - - self.plotter[task] = {} - - if t_type == "classification": - metric_list = [metrics.accuracy, metrics.auroc] - elif t_type == "regression": - metric_list = [metrics.bias, metrics.resolution, metrics.correlation, metrics.percentile68] - else: - assert t_type is isinstance(metrics.Metric), "Please add metric of type model.metrics.Metric to evaluator" - metric_list = to_list(t_type) - - for metric in metric_list: - log_dir = osp.join(self.log_dir, task + "_" + metric.name) - makedirs(log_dir, exist_ok=True) - - plotter_all = Plotter(metric.plt_fn, log_dir=log_dir, name="all") - self.plotter[task][metric.name] = {"all": plotter_all} - - for dset in self.data: - - if dset.predictions is None: - dset.predict(self.model, dset) - - y_pred, y_true = dset.y_pred[task].squeeze(), dset.y_true[task].squeeze() - plotter = Plotter(metric.plt_fn, log_dir=log_dir, name=metric.name) - self.plotter[task][metric.name][dset.name] = plotter - - result = metric(y_true, y_pred) # e.g. bias - args = metric.plot_data_fn(y_true, y_pred) - plt_kwargs = {"label": dset.name + "%.2f" % result, **dset.plt_kwargs} - - if type(args) == tuple: - plotter.plot(*args, **plt_kwargs) # * converts np array into tuple - else: - plotter.plot(args, **plt_kwargs) - - plotter.add_statbox(metric.statistics(y_true, y_pred)) - plotter.save() - - if type(args) == tuple: - plotter_all.add_data(*args, **plt_kwargs) # * converts np array into tuple - else: - plotter_all.add_data(args, **plt_kwargs) - - plotter_all.save() - - def obs_dep(self, metric, obs, obs_bins): - for bin in obs_bins: - pass - #To be implemented: 2D bootstrapping metric(dset.y_pred, dset.y_true) - - def plot(plt_fn, xdata, ydata, xerr=None, yerr=None, **plt_kwargs): - pass - - def plot_class_perf(self, data): - for dset in to_list(data): - pass - - def estimate_performance(self, data, metric, task): - for dset in data: - data - metric - - def plot_regression_perf(self, task): - self.energy_dep_bias_and_resolution(task) - self.scatter_perfomance(task) - - def energy_dep_bias_and_resolution(self, task): - pass - - def scatter_perfomance(self, task): - pass diff --git a/models/metrics.py b/models/metrics.py deleted file mode 100644 index a562b326f8af961bbf7de5a48633a2f35707cc14..0000000000000000000000000000000000000000 --- a/models/metrics.py +++ /dev/null @@ -1,174 +0,0 @@ -import numpy as np -from sklearn.metrics import roc_curve, roc_auc_score, accuracy_score -from matplotlib import pyplot as plt - - -def diff(y_true, y_pred): - # return {"x": y_pred - y_true} - return y_pred - y_true - - -def indentities(y_true, y_pred): - # return {"x": y_pred, "y": y_true} - return y_true, y_pred - - -def y_pred_id(y_true, y_pred): - return y_pred - - -def y_true_id(y_true, y_pred): - return y_true - - -class Metric: - def __init__(self, metric_fn, plt_fn=plt.hist, xlabel="x", ylabel="y", plot_data_fn=diff, unc=False, dist_stats=False, **plt_kwargs): - """ Metrics for the evaluator class. - - Parameters - ---------- - metric_fn : fn - Metric fn to estimate performance value of model, applied to (y_true, y_pred). E.g., for bias - plt_fn : fn - Pyplot plotting function used for plotting the results (default: plt.hist). - plot_data_fn : fn - Plotting transformation applied to (y_true, y_pred) before calling plt_fn, i.e., plot_data_fn(y_true, y_pred). - btrp : bool - Does the metric support bootstrapped uncertainties - Returns - ------- - type - Description of returned object. - - """ - self.plt_fn = plt_fn - self.metric_fn = metric_fn - self.unc = unc - self.plot_data_fn = plot_data_fn - self.name = metric_fn.__name__.split("_fn")[0] - self.plt_kwargs = plt_kwargs - self.dist_stats = dist_stats - self.xlabel = xlabel - self.ylabel = ylabel - - def __call__(self, *args): - return self.metric_fn(*args) - - def plot(self, ax, *args): - y_pred, y_true = self.call(*args) - - def make_labels(self, ax, legend=True): - ax.set_xlabel(self.xlabel) - ax.set_ylabel(self.ylabel) - - if legend is True: - ax.legend() - -class RegressionMetric(Metric): - # def __init__(self, metric_fn, plt_fn=plt.hist, plot_data_fn=diff, unc=False, dist_stats=False, **plt_kwargs): - # super.__init__(metric_fn, plt_fn=plt.hist, plot_data_fn=diff, unc=False, dist_stats=False, **plt_kwargs) - - def statistics(self, y_true, y_pred): - def mse(x): - return np.mean(x**2) - - if self.dist_stats is True: - - stats = {} - - for met in [np.mean, np.std, mse]: - stats[met.name] = met(y_pred - y_true) - - return "\n".join(["%s = %.2f" % (k, val) for k, val in stats.items()]) - else: - return '' - - -class ClassificationMetric(Metric): - - def statistics(self, y_true, y_pred): - def acc(y_true, y_pred): - return accuracy_fn(y_true, y_pred) - - if self.dist_stats is True: - stats = {} - - for met in [acc]: - stats[met.name] = met(y_pred - y_true) - - return "\n".join(["%s = %.2f" % (k, val) for k, val in stats.items()]) - else: - return '' - - -def resolution_fn(y_true, y_pred): - return np.std(y_true - y_pred) - - -def bias_fn(y_true, y_pred): - return np.mean(y_pred - y_true) - - -def accuracy_fn(y_true, y_pred, threshold=0.5): - y_pred_ = prob2pred(y_true, y_pred) - return accuracy_score(y_true, y_pred_) - - -def roccurve_fn(y_true, y_pred): - return roc_curve(y_true, y_pred)[3] - - -def fpr_fn(y_true, y_pred): - fpr, tpr, thresholds = roc_curve(y_true, y_pred) - return fpr - - -def tpr_fn(y_true, y_pred): - fpr, tpr, thresholds = roc_curve(y_true, y_pred) - return tpr - - -def auroc_fn(y_true, y_pred): - return roc_auc_score(y_true, y_pred) # switch aquired by sklearn - - -def correlation_fn(y_true, y_pred): - return np.corrcoef(y_true, y_pred)[1, 0] - - -def percentile68_fn(y_true, y_pred): - return np.percentile(y_pred - y_true, 68) - - -def prob2pred(y_true, y_pred, threshold=0.5): - result = np.zeros_like(y_true) - - if y_pred.ndim == 1: - result[y_pred > threshold] = 1. - - return result - - -def confusion_plt_fn(y_true, y_pred, num_classes=2): - return np.histogram2d(y_true, y_pred, bins=np.linspace(-0.5, num_classes - 0.5, num_classes + 1))[0] - - -# Regression metrics -bias = RegressionMetric(bias_fn, plt_fn=plt.hist, plot_data_fn=diff) -resolution = RegressionMetric(resolution_fn, plt_fn=plt.hist, plot_data_fn=diff) -correlation = RegressionMetric(correlation_fn, plt_fn=plt.scatter, plot_data_fn=indentities) -percentile68 = RegressionMetric(percentile68_fn, plt_fn=plt.hist, plot_data_fn=diff) - -# Classification metrics - - -def plt_roc_fn(y_true, y_pred): - fpr, tpr, _ = roc_curve(y_true, y_pred) - return fpr, tpr - - -# Setze diff_fn -auroc = ClassificationMetric(auroc_fn, plt_fn=plt.plot, plot_data_fn=plt_roc_fn) -accuracy = ClassificationMetric(accuracy_fn, plt_fn=plt.hist, plot_data_fn=y_pred_id) - -confusion = ClassificationMetric(lambda c: plt.imshow(c, ), plt_fn=plt.hist, plot_data_fn=confusion_plt_fn, plt_kwargs={"interpolation": "nearest", "vmin": 0, "vmax": 1, "cmap": plt.cm.YlGnBu}) diff --git a/test_eval.py b/test_eval.py deleted file mode 100644 index 36f8add99a2296513dbc0aa81704c5b33cdb9766..0000000000000000000000000000000000000000 --- a/test_eval.py +++ /dev/null @@ -1,27 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -import numpy as np # noqa -from hess.hess_mappings import default_mapping -from hess.dataset import HESSLoader -from hess.models import tf_cnn -from models import training, evaluate -from tools.utils import config - -CONFIG = config() -BATCHSIZE = 128 -EPOCHS = 100 -TASKS = ["primary", "energy"] # , "primary", "axis", "impact", "energy"] - -path_proton = "/home/woody/caph/mppi067h/gamma_ray_reconstruction_with_ml/gnn/hess_datasets/phase2d3/phase2d3_proton_20deg_0deg_0.0off.h5" -path_gamma = "/home/woody/caph/mppi067h/gamma_ray_reconstruction_with_ml/gnn/hess_datasets/phase2d3/phase2d3_gamma_20deg_0deg_0.0off_cone5.h5" - -hdf_loader = HESSLoader([path_proton, path_gamma]) -train_data, val_data, test_data = hdf_loader.make_image_datasets() - -val_data.tf(transform=default_mapping) -cnn_model = tf_cnn.get_model(train_data.feat, tasks=TASKS, stats=train_data.get_stats(), bn=True, share_ct14=True) - -my_aiact = training.Trainer(model=cnn_model, log_dir=CONFIG.log_dir, tasks=TASKS, epochs=EPOCHS, batch_size=BATCHSIZE) - -evaluation = evaluate.Evaluator(my_aiact.model, val_data, {"primary": "classification", "energy": "regression"}, log_dir=CONFIG.log_dir) -evaluation.evaluate()