离线
|
新冠肺炎CT辅助诊断文献实战-01
https://www.nature.com/articles/s41551-020-00633-5
2020年11月,华中科技大学发表在Nature Biomedical Engineering
如果不想看文献的话可以看我写的文献导读(文献导读001)基于图像识别技术的cov19辅助诊断 - 知乎 (zhihu.com)
建议还是看一下,文章不难,我也只用了30分钟就大概的看了一遍
由于上一个系列——深度学习与单细胞,我自己在写教程运行的过程中发现了一系列的问题导致目前暂时无法更新。然后最近老师推荐看来一篇关于医学图像识别辅助诊断新冠肺炎的文章,看了一下文章发现特别简单,于是就打算利用这篇文章继续我的深度学习系列教程。
这个系列教程与深度学习和生物信息学的应用暂时无关,就是非常基础的pytorch实战和一系列的问题。
网络上其实可以搜索到很多相关的教程,只是我这个是针对医学数据而已哈
数据下载
iCTCF - CT images and clinical features for COVID-19 (biocuckoo.cn)
一共有两套数据需要下载:
- 已经标记好的label数据,用于构建第一个模型
- cohort1 和cohort2
cohort1,cohort2的下载略微有点麻烦得一个个点,这里的话可以使用迅雷自带的批量下载功能
如果是ubuntu系统的话,可以使用如下代码批量下载:
- # 这里本身就是国内源,不要做奇奇怪怪的操作
- for i in {1..1521}
- do
- wget -c http://ictcf.biocuckoo.cn/patient/CT/Patient%20{$i}.zip
- done
复制代码 背景
根据文章的神经网络框架,该框架分为三个部分其中,第一部分和第二部分是用图像数据训练,第三部分是用临床特征数据进行训练。
由于CT图像一次性会扫描出很多张图片,但是并不是每一张图片都是有用的。因此,医生在对CT图像做出判断决策时候通常需要以下几个步骤:
- 从大量的CT图像中挑选出包涵信息的图片
- 从这些图片中挑选出可能与疾病相关的图片
- 从可能与疾病相关的图片中做出决策
上面的三个步骤又可以总结成为两个步骤,即:
那么深度学习技术的辅助诊断也是从这两个部分进行的。那么,在完全不考虑经费和计算资源的情况下那么辅助诊断的实施就非常的简单。直接整合所有的CT图像数据和clinical outcomes 建模即可,但是,显然这种做法并不现实,因为CT图像通常一个样本产生的数据非常多,并且由于噪声越大,所需要的模型也越大,训练时长也越久。因此,文章的做法是将两个模型分开做,即构建一个挑选图片的模型,将挑选后的图片再次构建一个clinical outcomes 相关的模型,这个构思也是目前辅助诊断中最为常用的模型。此外,将步骤分开的话,由于单个任务的复杂度低,因此构建模型也相对简单。
正文
那么,我们先按照文献给的思路一步步的构建模型,先构建第一个模型
文章使用的是kears,我用的是pytorch哈,由于文章中有些部分我感觉并不是很合理,因此我打算稍微改动一下
首先初始化配置
- #### no mean but to fold the code
- import os
- import math
- import itertools
- import numpy as np
- import pandas as pd2
- import matplotlib.pyplot as plt
- from collections import Counter
- from matplotlib import cm
- import cv2
- import torch
- import torchvision
- import torch.utils.data as Data
- import torch.nn as nn
- import torch.nn.functional as F
- import pytorch_lightning as pl
- from sklearn.preprocessing import StandardScaler,MinMaxScaler
- from sklearn.model_selection import train_test_split
- import warnings
- from myexptorch import *
- %matplotlib inline
- %config InlineBackend.figure_format = 'svg'
- warnings.filterwarnings("ignore")
- # if GPU avaliabel, data to
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- def deviceChange(x):
- if (torch.cuda.is_available() & x == 'cuda'):
- device = torch.device('cuda')
- else:
- device = torch.device('cpu')
- return device
- # =============================================================================
- # the following code is common pytorch formula
- # =============================================================================
- # change expr to tensor
- def expToTorch(x):
- return torch.from_numpy(np.array(x)).float()
- # change label to tensor
- def labelTotorch(y):
- return torch.LongTensor(y)
- # data to data iter
- def makeDataiter(x,y,batch_size,shuffle=True):
- return Data.DataLoader(Data.TensorDataset(x, y), batch_size, shuffle=shuffle)
- # scale
- def DataFrame_normalize(my_data,std_method):
- if std_method == 'minmax':
- method = MinMaxScaler()
- if std_method == 'std':
- method = StandardScaler()
- my_data = pd.DataFrame(method.fit(my_data).transform(
- my_data),columns=my_data.columns,index=my_data.index)
- return my_data
- def toOneHot(ylabels,n_class):
- onehot = torch.zeros(ylabels.shape[0],n_class)
- index = torch.LongTensor(ylabels).view(-1,1)
- onehot.scatter_(dim=1, index=index, value=1)
- return onehot
- def toLabel(ylabels):
- return torch.topk(ylabels, 1)[1].squeeze(1)
- # show tenor to picture
- def show_tensor_pictures(x, y):
- plt.figure(figsize=(12,12))
- fig, axes = plt.subplots(1, len(x))
- for ax, image, label in zip(axes, x, y):
- ax.imshow(image.view((28, 28)).numpy())
- ax.set_title(label)
- ax.axes.get_xaxis().set_visible(False)
- ax.axes.get_yaxis().set_visible(False)
- plt.show()
- def show_batch(imgs):
- grid = torchvision.utils.make_grid(imgs,nrow=5)
- plt.imshow(grid.numpy().transpose((1, 2, 0)))
- plt.title('Batch from dataloader')
复制代码 导入数据
pytorch的数据导入方法非常的方便,还会自动添加label,导入的数据仅需要满足如下要求:
- 创建一个用于储存数据的文件夹 (totallabeldata)
- 在该文件夹下将同label的数据存放在相同的文件夹下,并命名
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x33KBSOd-161********44)(https://tva1.sinaimg.cn/large/008eGmZEly1gnylnfyvh2j310r06ggm9.jpg)]
然后使用如下代码进行数据导入
- # load images to torch
- # plz put same class images to dif-folder
- train = torchvision.datasets.ImageFolder('../totallabeldata/',
- transform=torchvision.transforms.Compose([
- torchvision.transforms.Grayscale(num_output_channels=1),
- torchvision.transforms.Scale(256),
- torchvision.transforms.CenterCrop(200),
- torchvision.transforms.ToTensor()])
- )
- train_loader = torch.utils.data.DataLoader(train, batch_size=128,shuffle=True,num_workers =24)
- len(train_loader.dataset.targets)
- train_loader.dataset.class_to_idx
- """
- 在train_loader.dataset.class_to_idx的对象中记录了每个类别对应的class number
- """
- # [out] :
- # 19685
- # {'NiCT': 0, 'nCT': 1, 'pCT': 2}
复制代码 然后这里的话就面临着一个问题,就是没办法进行data的分割,总所周知,我们在进行建模的时候,通常要将数据分割成3份:training, validation, test,通常情况下比例为8:1:1。 因此要对数据进行如下操作
- # 目前我没有找到更好的方法了,如果有知道的朋友请留个言呗
- # Create empty list
- XRawData = []
- YRawData = []
- # from iteration extract batch
- for i,(x,y) in enumerate(train_loader):
- XRawData.append(x)
- YRawData.append(y)
- # concat all batch
- XRawData = torch.cat(XRawData)
- YRawData = torch.cat(YRawData)
- randoms = 42
- batch_size =150
- # Step.3 data split to train - test
- x_train, x_test, y_train, y_test = train_test_split(
- XRawData, YRawData, stratify = YRawData,
- test_size=0.1, random_state=randoms)
- # Step.4 train data split validation data
- x_train, x_val, y_train, y_val = train_test_split(
- x_train, y_train, stratify = y_train,
- test_size=0.1, random_state=randoms)
- # Step.6 transform to iterate object
- train_iter = makeDataiter(x_train, y_train, batch_size=batch_size,shuffle=True)
- val_iter = makeDataiter(x_val, y_val, batch_size=batch_size,shuffle=True)
- # visualization
- for i, (batch_x, batch_y) in enumerate(train_iter):
- if(i<1):
- show_batch(batch_x)
- plt.axis('off')
- plt.show()
- # class count
- Counter(YRawData.numpy())
- # [out]:
- # Counter({2: 4001, 1: 9979, 0: 5705})
复制代码 构建模型(按文章)
- class VGG_Simple(pl.LightningModule):
- train_epoch_loss = []
- train_epoch_acc = []
- train_epoch_aucroc = []
- val_epoch_loss = []
- val_epoch_acc = []
- val_epoch_aucroc = []
- test_predict = []
- test_sample_label = []
- test_decoder = []
- test_digitcaps = []
- test_matrix = []
- test_conv = []
- test_primary = []
- test_decoder = []
- def __init__(self,myloss = None):
- super(VGG_Simple, self).__init__()
- self.myloss = myloss
- self.cov = nn.Sequential(
- nn.Conv2d(in_channels=1,out_channels=64,kernel_size=3,stride=1),
- nn.ReLU(inplace=True),
- nn.Conv2d(in_channels=64,out_channels=64,kernel_size=3,stride=1),
- nn.ReLU(inplace=True),
- nn.MaxPool2d((2, 2)),
- nn.Conv2d(in_channels=64,out_channels=32,kernel_size=3,stride=1),
- nn.ReLU(inplace=True),
- nn.Conv2d(in_channels=32,out_channels=32,kernel_size=3,stride=1),
- nn.ReLU(inplace=True),
- nn.MaxPool2d((2, 2)),
- nn.Conv2d(in_channels=32,out_channels=16,kernel_size=3,stride=1),
- nn.ReLU(inplace=True),
- nn.Conv2d(in_channels=16,out_channels=16,kernel_size=3,stride=1),
- nn.ReLU(inplace=True),
- nn.MaxPool2d((2, 2))
- )
- self.dnn = nn.Sequential(
- nn.Linear(7056, 64),
- nn.ReLU(inplace=True),
- nn.Dropout(0.5),
- nn.Linear(64, 32),
- nn.ReLU(inplace=True),
- nn.Dropout(0.5),
- nn.Linear(32, 3)
- )
- def forward(self, x):
- # x.size(0)即为batch_size
- in_size = x.size(0)
- out = self.cov(x)
- out = out.view(in_size, -1)
- out = self.dnn(out)
- return out
- # 3. 定义优化器
- def configure_optimizers(self):
- optimizer = torch.optim.Adam(self.parameters(), lr=0.001,weight_decay=0.05)
- return optimizer
- # 4. 训练loop
- def training_step(self, train_batch, batch_ix):
- vector_sample, sample_label = train_batch
- probs = self.forward(vector_sample)
- loss = self.myloss(probs, sample_label)
- acc = pl.metrics.functional.accuracy(probs, sample_label)
- mylogdict = {'loss': loss, 'log': {'train_loss': loss, 'train_acc': acc}}
- return mylogdict
- def validation_step(self, validation_batch, batch_ix):
- vector_sample, sample_label = validation_batch
- probs= self.forward(vector_sample)
- loss = self.myloss(probs, sample_label)
- val_acc = pl.metrics.functional.accuracy(probs, sample_label)
- self.log_dict({'val_loss': loss, 'val_acc': val_acc})
- mylogdict = {'log': {'val_loss': loss, 'val_acc': val_acc}}
- return mylogdict
- def test_step(self, test_batch, batch_ix):
- vector_sample, sample_label = test_batch
- probs= self.forward(vector_sample)
- self.test_predict.append(probs.cpu())
- self.test_sample_label.append(sample_label.cpu())
- return {'test': 0}
- def training_epoch_end(self, output):
- train_loss = sum([out['log']['train_loss'].item() for out in output]) / len(output)
- self.train_epoch_loss.append(train_loss)
- train_acc = sum([out['log']['train_acc'].item() for out in output]) / len(output)
- self.train_epoch_acc.append(train_acc)
- print(train_acc)
- return train_loss
- def validation_epoch_end(self, output):
- val_loss = sum([out['log']['val_loss'].item() for out in output]) / len(output)
- self.val_epoch_loss.append(val_loss)
- val_acc = sum([out['log']['val_acc'].item() for out in output]) / len(output)
- self.val_epoch_acc.append(val_acc)
- print('mean_val_loss: ', val_loss, '\t', 'mean_val_acc: ', val_acc)
- return val_loss
复制代码 然后开始训练模型
- # training
- traning = True
- # set seed
- pl.utilities.seed.seed_everything(seed=42)
- # Defined function
- myloss = nn.CrossEntropyLoss()
- model = VGG_Simple(myloss=myloss)
- if traning:
- # output file
- OUTPUT_DIR = './lightning_logs'
- tb_logger = pl.loggers.TensorBoardLogger(save_dir='./',
- name=f'check_point')
- # set check point to choose best model
- checkpoint_callback = pl.callbacks.ModelCheckpoint(
- dirpath=tb_logger.log_dir,
- filename='{epoch}-{val_acc:.5f}-{val_loss:.5f}',
- save_top_k = 15, #保留最优的15
- monitor='val_acc', # check acc
- mode='auto'
- )
- # train loop
- trainer = pl.Trainer(gpus=-1,
- callbacks=[checkpoint_callback],
- max_epochs = 100,
- gradient_clip_val=0,
- auto_lr_find=False)# 开始训练
- trainer.fit(model, train_iter, val_iter)
- from pathlib import Path
- out = Path(tb_logger.log_dir)
- print(out)
- [ckpt.stem for ckpt in out.iterdir()]
复制代码 但是,如果你这样训练那么会出现如下的问题:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p21vXGnK-161********46)(https://tva1.sinaimg.cn/large/008eGmZEly1gnylnhvxatj30u00t642p.jpg)]
不管如何训练,loss不会改变,accuracy也不会改变,那么这是什么原因呢?常见的有如下两种原因:
当然,在这次训练中,这两种情况可以完全排除,理由很简单,我用的模型和文章一模一样
那么,我们就要从loss函数或者数据本生出发,这里我们要注意一下小细节,在之前我们进行class counter的时候的输出
- # class count
- Counter(YRawData.numpy())
- # [out]:
- # Counter({2: 4001, 1: 9979, 0: 5705})
复制代码 发现,样本量的差距着实有点大,但是说大把也不是很大,可能在运行三五百次的epoch应该是完全没有什么太大的问题
但是为了节约时间我们要对loss进行加权
- class_weight = torch.FloatTensor([2,5,2]).cuda()
- myloss = nn.CrossEntropyLoss(weight=class_weight)
复制代码 最终修改代码如下
- # training traning = True# set seed pl.utilities.seed.seed_everything(seed=42)# Defined function class_weight = torch.FloatTensor([2,5,2]).cuda()
- myloss = nn.CrossEntropyLoss(weight=class_weight)
- model = VGG_Simple(myloss=myloss)if traning: # output file OUTPUT_DIR = './lightning_logs' tb_logger = pl.loggers.TensorBoardLogger(save_dir='./', name=f'check_point') # set check point to choose best model checkpoint_callback = pl.callbacks.ModelCheckpoint( dirpath=tb_logger.log_dir, filename='{epoch}-{val_acc:.5f}-{val_loss:.5f}', save_top_k = 15, #保留最优的15 monitor='val_acc', # check acc mode='auto' ) # train loop trainer = pl.Trainer(gpus=-1, callbacks=[checkpoint_callback], max_epochs = 100, gradient_clip_val=0, auto_lr_find=False)# 开始训练 trainer.fit(model, train_iter, val_iter) from pathlib import Path out = Path(tb_logger.log_dir) print(out) [ckpt.stem for ckpt in out.iterdir()]
复制代码 输出结果如下
我们可以看到,在第8次epoch就已经达到92%的准确率了,但是,还有一个问题,这个准确率还是太低了。
考虑到,CT数据的图像是灰度图,且图像的大致形状也差不多,感觉各个医院拍出来的图也差不了多少(自己认为的),所以我认为,模型中的dropout层是没有用的
这里是我觉得文章不妥的第一个点
- model.train_epoch_acc[40:44]
- # [out]
- # [ 0.8633503256557143,
- # 0.8609685636012354,
- # 0.862724444576513,
- # 0.8625120386899074]
复制代码 我们发现train的过程准确率也才86%
在删除dropout后,准确率得到显著的提升
- model.train_epoch_acc[40:44]
- # [out]
- #[0.9764486033225728,
- # 0.9797252973663473,
- # 0.979190********92,
- # 0.9742679161446117]
复制代码 验证模型
- # load model parameter
- version = 'version_14' # choose best model path
- path = os.path.join(os.getcwd(),'check_point',version)
- ckpt = str(os.listdir(path)[2]) # choose best model
- print(ckpt)
- # get best model parameter
- ckpt = os.path.join(path,ckpt)
- model = VGG_Simple(myloss=myloss)
- trainer = pl.Trainer(resume_from_checkpoint=ckpt, gpus=-1)
- # create test_iter
- test_iter = makeDataiter(x_test, y_test, batch_size=batch_size,shuffle=False)
- trainer.test(model, test_iter)
- # get test accuracy
- predict = torch.cat(model.test_predict)
- y = torch.cat(model.test_sample_label)
- predict = toLabel(predict).cpu().numpy()
- y = y.cpu().numpy()
- sum( y == predict) / len(y)
- # [out]:
- # 0.9862874555611986
复制代码 测试集准确率98%,然后这里要注意一下,文章用的是AUC去评估,对于像这种的3分类任务,使用AUC评估很不直观,应该使用混淆矩阵或者accuarcy进行评估
这里是我觉得文章不妥的第二个点
- Best Regards,
- Yuan.SH;
- School of Basic Medical Sciences,
- Fujian Medical University,
- Fuzhou, Fujian, China.
- please contact with me via the following ways:
- (a) e-mail :yuansh3354@163.com
复制代码 来源:https://blog.csdn.net/qq_40966210/article/details/114021421
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
|