我正在尝试在PyTorch中原生实现宏观F1分数(F-measure),而不是使用已经广泛使用的sklearn.metrics.f1_score,以便直接在GPU上计算该度量。
据我所知,为了计算宏观F1分数,我需要计算所有标签的敏感性和精确性的F1分数,然后取所有这些的平均值。
我的尝试
我当前的实现看起来像这样:
def confusion_matrix(y_pred: torch.Tensor, y_true: torch.Tensor, n_classes: int): conf_matrix = torch.zeros([n_classes, n_classes], dtype=torch.int) y_pred = torch.argmax(y_pred, 1) for t, p in zip(y_true.view(-1), y_pred.view(-1)): conf_matrix[t.long(), p.long()] += 1 return conf_matrixdef forward(self, y_pred: torch.Tensor, y_true: torch.Tensor) -> torch.Tensor: conf_matrix = confusion_matrix(y_pred, y_true, self.classes) TP = conf_matrix.diag() f1_scores = torch.zeros(self.classes, dtype=torch.float) for c in range(self.classes): idx = torch.ones(self.classes, dtype=torch.long) idx[c] = 0 FP = conf_matrix[c, idx].sum() FN = conf_matrix[idx, c].sum() sensitivity = TP[c] / (TP[c] + FN + self.epsilon) precision = TP[c] / (TP[c] + FP + self.epsilon) f1_scores[c] += 2.0 * ((precision * sensitivity) / (precision + sensitivity + self.epsilon)) return f1_scores.mean()
self.classes
是标签的数量,self.epsilon
是一个非常小的值,设置为 10-e12
,以防止 DivisionByZeroError
。
在训练时,我为每个批次计算该度量,并将所有度量的平均值作为最终得分。
问题
问题是当我将我自定义的F1分数与sklearn的宏观F1分数进行比较时,它们很少相等。
# 示例1eval_cce 0.5203, eval_f1 0.8068, eval_acc 81.5455, eval_f1_sci 0.8023,test_cce 0.4784, test_f1 0.7975, test_acc 82.6732, test_f1_sci 0.8097# 示例2eval_cce 0.3304, eval_f1 0.8211, eval_acc 87.4955, eval_f1_sci 0.8626,test_cce 0.3734, test_f1 0.8183, test_acc 85.4996, test_f1_sci 0.8424# 示例3eval_cce 0.4792, eval_f1 0.7982, eval_acc 81.8482, eval_f1_sci 0.8001,test_cce 0.4722, test_f1 0.7905, test_acc 82.6533, test_f1_sci 0.8139
虽然我已经尝试在网上搜索,大多数情况都涵盖了二元分类。我还没有找到一个例子尝试做我正在尝试做的事情。
我的问题
我的尝试中有没有明显的问题?
更新(2020年6月10日)
我还没有找出我的错误。由于时间限制,我决定使用sklearn提供的F1宏观分数。虽然它不能直接与GPU张量一起工作,但对于我的情况来说已经足够快了。
然而,如果有人能解决这个问题,那将是非常棒的,这样其他可能遇到此问题的人就可以解决他们的问题了。
回答:
我在不久前用PyTorch编写了自己的实现:
from typing import Tupleimport torchclass F1Score: """ 用于在PyTorch中计算F1的类。 """ def __init__(self, average: str = 'weighted'): """ 初始化。 参数: average: 平均方法 """ self.average = average if average not in [None, 'micro', 'macro', 'weighted']: raise ValueError('平均参数值错误') @staticmethod def calc_f1_micro(predictions: torch.Tensor, labels: torch.Tensor) -> torch.Tensor: """ 计算微观F1。 参数: predictions: 预测张量 labels: 原始标签张量 返回: F1分数 """ true_positive = torch.eq(labels, predictions).sum().float() f1_score = torch.div(true_positive, len(labels)) return f1_score @staticmethod def calc_f1_count_for_label(predictions: torch.Tensor, labels: torch.Tensor, label_id: int) -> Tuple[torch.Tensor, torch.Tensor]: """ 计算标签的F1和真实计数 参数: predictions: 预测张量 labels: 原始标签张量 label_id: 当前标签的ID 返回: 标签的F1分数和真实计数 """ # 标签计数 true_count = torch.eq(labels, label_id).sum() # 真阳性:标签等于预测并且等于label_id true_positive = torch.logical_and(torch.eq(labels, predictions), torch.eq(labels, label_id)).sum().float() # 标签的精确度 precision = torch.div(true_positive, torch.eq(predictions, label_id).sum().float()) # 将NaN值替换为0 precision = torch.where(torch.isnan(precision), torch.zeros_like(precision).type_as(true_positive), precision) # 标签的召回率 recall = torch.div(true_positive, true_count) # F1 f1 = 2 * precision * recall / (precision + recall) # 将NaN值替换为0 f1 = torch.where(torch.isnan(f1), torch.zeros_like(f1).type_as(true_positive), f1) return f1, true_count def __call__(self, predictions: torch.Tensor, labels: torch.Tensor) -> torch.Tensor: """ 根据初始化时定义的平均方法计算F1分数。 参数: predictions: 预测张量 labels: 原始标签张量 返回: F1分数 """ # 微观计算更简单 if self.average == 'micro': return self.calc_f1_micro(predictions, labels) f1_score = 0 for label_id in range(1, len(labels.unique()) + 1): f1, true_count = self.calc_f1_count_for_label(predictions, labels, label_id) if self.average == 'weighted': f1_score += f1 * true_count elif self.average == 'macro': f1_score += f1 if self.average == 'weighted': f1_score = torch.div(f1_score, len(labels)) elif self.average == 'macro': f1_score = torch.div(f1_score, len(labels.unique())) return f1_score
你可以按以下方式进行测试:
from sklearn.metrics import f1_scoreimport numpy as nperrors = 0for _ in range(10): labels = torch.randint(1, 10, (4096, 100)).flatten() predictions = torch.randint(1, 10, (4096, 100)).flatten() labels1 = labels.numpy() predictions1 = predictions.numpy() for av in ['micro', 'macro', 'weighted']: f1_metric = F1Score(av) my_pred = f1_metric(predictions, labels) f1_pred = f1_score(labels1, predictions1, average=av) if not np.isclose(my_pred.item(), f1_pred.item()): print('!' * 50) print(f1_pred, my_pred, av) errors += 1if errors == 0: print('没有错误!')