当前位置: 代码迷 >> 综合 >> 小黑大口啃初版baseline:Score Clinical Patient Notes提交部分代码
  详细解决方案

小黑大口啃初版baseline:Score Clinical Patient Notes提交部分代码

热度:81   发布时间:2023-11-13 19:19:07.0

目录

    • 数据目录:
    • 1.将fasttokenizer导入transformers库里面
    • 2.config类定义
    • 3.导包
    • 4.加载tokenizer
    • 5.评价指标
    • 6.转换函数
    • 7.utils包
    • 8.数据读取
    • 9.超参数阈值选择
    • 10.合并数据集
    • 11.dataset设置
    • 12.model设置
    • 13.推断函数
    • 14.model测试
    • 15.提交

数据目录:

在这里插入图片描述

1.将fasttokenizer导入transformers库里面

import shutil
from pathlib import Path
# transformers库的位置
transformers_path = Path('/opt/conda/lib/python3.7/site-packages/transformers')
# tokenizer的文件夹位置
input_dir = Path('../input/deberta-v2-3-fast-tokenizer')
# tokenizer文件的位置
convert_file = input_dir / 'convert_slow_tokenizer.py'
# convert_tokenizer将要移动到transformers库里的路径
conversion_path = transformers_path/convert_file.name
# 如果存在这个同名文件,则删除它
if conversion_path.exists():conversion_path.unlink()
# 将convert_tokenizer文件拷贝到transformers库里面
print('将',convert_file,'移动到',transformers_path,'.....')
shutil.copy(convert_file,transformers_path)# 开始将deberta模型的tokenizer文件移动到transformers.models.deverta_v2库里面
deberta_v2_path = transformers_path / 'models' / 'deberta_v2'
for filename in ['tokenization_deberta_v2.py','tokenization_deberta_v2_fast.py']:file_path = deberta_v2_path/filenameif file_path.exists():file_path.unlink()shutil.copy(input_dir/filename,file_path)

2.config类定义

# config类的定义
class CFG:num_workers = 4path = '../input/deberta-med-score-1'config_path = path + '/config.pth'model = 'microsoft/deberta-v3-large'batch_size = 32fc_dropout = 0.2max_len = 354seed = 42n_fold = 5trn_fold = [0,1,2,3,4]

3.导包

# 库的导入
import os
import gc
import ast
import re
import sys
import copy
import json
import time
import math
import string
import pickle
import random
import joblib
import itertools
import warnings
warnings.filterwarnings('ignore')import scipy as sp
import numpy as np
import pandas as pd
pd.set_option('display.max_rows',500)
pd.set_option('display.max_columns',500)from tqdm.auto import tqdm
from sklearn.metrics import f1_score
from sklearn.model_selection import StratifiedKFold,GroupKFold,KFoldimport torch
import torch.nn as nn
from torch.nn import Parameter
import torch.nn.functional as F
from torch.optim import Adam,SGD,AdamW
from torch.utils.data import DataLoader,Datasetimport tokenizers
import transformers
print(f'tokenizers.__version__:{
      tokenizers.__version__}')
print(f'transformers.__version__:{
      transformers.__version__}')
from transformers import AutoTokenizer,AutoModel,AutoConfig
from transformers import get_linear_schedule_with_warmup,get_cosine_schedule_with_warmup
%env TOKENIZERS_PARALLELISM = true
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

4.加载tokenizer

# 加载tokenizerfrom transformers.models.deberta_v2.tokenization_deberta_v2_fast import DebertaV2TokenizerFast
tokenizer = DebertaV2TokenizerFast.from_pretrained('../input/deberta-v2-fast-tokenizer')
CFG.tokenizer = tokenizer
%env TOKENIZERS_PARALLELISM=truetext = 'I love you!'
tokenizer(text,text,add_special_tokens = True,return_offsets_mapping = True)

{‘input_ids’: [1, 273, 472, 274, 300, 2, 273, 472, 274, 300, 2], ‘token_type_ids’: [0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1], ‘attention_mask’: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], ‘offset_mapping’: [(0, 0), (0, 1), (1, 6), (6, 10), (10, 11), (0, 0), (0, 1), (1, 6), (6, 10), (10, 11), (0, 0)]}

5.评价指标

# 评价指标F1def micro_f1(preds,truths):# 微观F1preds = np.concatenate(preds)truths = np.concatenate(truths)return f1_score(truths,preds)def span_micro_f1(preds,truths):bin_preds = []bin_truths = []for pred,truth in zip(preds,truths):if not len(pred) and not len(truth):continuelength = max(np.max(pred) if len(pred) else 0,np.max(truth) if len(truth) else 0)bin_preds.append(spans_to_binary(pred,length))bin_truths.append(spans_to_binary(truth,length))# print(bin_preds) [array([0., 1., 0., 0., 0., 0., 0., 0.])]# print(bin_truths) [array([0., 1., 0., 1., 1., 1., 1., 1.])]return micro_f1(bin_preds,bin_truths)def spans_to_binary(spans,length = None):length = np.max(spans) if length is None else lengthbinary = np.zeros(length)for start,end in spans:binary[start:end] = 1return binary
preds = [[[1,2],[3,2]]]
truths = [[[1,2],[3,8]]]
span_micro_f1(preds,truths)

6.转换函数

# 转换为标准的形式
def create_labels_for_scoring(df):# example: ['0 1', '3 4'] -> ['0 1; 3 4']df['location_for_create_labels'] = [ast.literal_eval(f'[]')] * len(df)for i in range(len(df)):lst = df.loc[i,'location']if lst:new_list = ';'.join(lst)df.loc[i,'location_for_create_labels'] = ast.literal_eval(f'[["{
      new_list}"]]')# create labelstruths = []for location_list in df['location_for_create_labels'].values:truth = []if len(location_list) > 0:location = location_list[0]for loc in [s.split() for s in location.split(';')]:start,end = int(loc[0]),int(loc[1])truth.append([start,end])truths.append(truth)return truths
?# 转化成字符级别的概率
def get_char_probs(texts,predictions,tokenizer):results = [np.zeros(len(t)) for t in texts]for i,(text,prediction) in enumerate(zip(texts,predictions)):encoded = tokenizer(text,add_special_tokens = True,return_offsets_mapping = True)for idx,(offset_mapping,pred) in enumerate(zip(encoded['offset_mapping'],prediction)):start = offset_mapping[0]end = offset_mapping[1]results[i][start:end] = predreturn results
?# 识别连续的段
def get_results(char_probs,th = 0.5):results = []for char_prob in char_probs:result = np.where(char_prob >= th)[0] + 1# reslt:[ 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 126 127 128 129 130 # 131 132 133 134 135 136 137 138 143 144 145 146 147 148 149 150 151]# -------># [[85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99], # [126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138], # [143, 144, 145, 146, 147, 148, 149, 150, 151]]result = [list(g) for _,g in itertools.groupby(result,key = lambda n,c = itertools.count():n - next(c))]# result:['85 99', '126 138', '143 151']result = [f'{
      min(r)} {
      max(r)}' for r in result]result = ';'.join(result)results.append(result)return results
?# 切分标准形式
def get_predictions(results):predictions = []for result in results:prediction = []if result != '':for loc in [s.split() for s in result.split(';')]:start,end = int(loc[0]),int(loc[1])prediction.append([start,end])predictions.append(prediction)return predictions

7.utils包

# utilsdef get_score(y_true,y_pred):score = span_micro_f1(y_true,y_pred)return scoredef get_logger(filename = 'inference'):from logging import getLogger,INFO,StreamHandler,FileHandler,Formatterlogger = getLogger(__name__)logger.setLevel(INFO)handler1 = StreamHandler()handler1.setFormatter(Formatter('%(message)s'))handler2 = FileHandler(filename = f'{
      filename}.log')handler2.setFormatter(Formatter('%(message)s'))logger.addHandler(handler1)logger.addHandler(handler2)return loggerLOGGER = get_logger()def seed_everything(seed = 42):random.seed(seed)os.environ['PYTHONHASHSEED'] = str(seed)np.random.seed(seed)torch.manual_seed(seed)torch.cuda.manual_seed(seed)torch.backends.cudnn.deterministic = True
seed_everything(seed=42)

8.数据读取

oof_0 = pd.read_pickle('../input/deberta-med-score-1'+'/oof_df_0.pkl')
oof_1 = pd.read_pickle('../input/deberta-med-score-1'+'/oof_df_1.pkl')
oof_2 = pd.read_pickle('../input/deberta-med-score-1'+'/oof_df_2.pkl')
oof_3 = pd.read_pickle('../input/deberta-med-score-1'+'/oof_df_3.pkl')
oof_4 = pd.read_pickle('../input/deberta-med-score-1'+'/oof_df_4.pkl')
oof = pd.concat([oof_0,oof_1,oof_2,oof_3,oof_4]).reset_index()
truths = create_labels_for_scoring(oof)
print('truths[:10]:',truths[:10])
char_probs = get_char_probs(oof['pn_history'].values,oof[[i for i in range(CFG.max_len)]].values,CFG.tokenizer)

9.超参数阈值选择

best_th = 0.5
best_score = 0
for th in np.arange(0.45,0.55,0.01):th = np.round(th,2)results = get_results(char_probs,th = th)#print(results)preds = get_predictions(results)score = get_score(preds,truths)if best_score < score:best_th = thbest_score = scoreLOGGER.info(f"th: {
      th} score: {
      score:.5f}")
LOGGER.info(f"best_th: {
      best_th} score: {
      best_score:.5f}")

10.合并数据集

test = pd.read_csv('../input/nbme-score-clinical-patient-notes/test.csv')
submission = pd.read_csv('../input/nbme-score-clinical-patient-notes/sample_submission.csv')
features = pd.read_csv('../input/nbme-score-clinical-patient-notes/features.csv')
def preprocess_features(features):features.loc[27, 'feature_text'] = "Last-Pap-smear-1-year-ago"return features
features = preprocess_features(features)
patient_notes = pd.read_csv('../input/nbme-score-clinical-patient-notes/patient_notes.csv')print(f"test.shape: {
      test.shape}")
display(test.head())
print(f"features.shape: {
      features.shape}")
display(features.head())
print(f"patient_notes.shape: {
      patient_notes.shape}")
display(patient_notes.head())
test = test.merge(features,on = ['feature_num','case_num'],how = 'left')
test = test.merge(patient_notes,on = ['pn_num','case_num'],how = 'left')
print('合并后的test:')
display(test)

在这里插入图片描述
在这里插入图片描述

11.dataset设置

# Dataset设置def prepare_input(cfg,text,feature_text):inputs = cfg.tokenizer(text, feature_text, add_special_tokens=True,max_length=CFG.max_len,padding="max_length",return_offsets_mapping=False)for k,v in inputs.items():inputs[k] = torch.tensor(v,dtype = torch.long)return inputsclass TestDataset(Dataset):def __init__(self,cfg,df):self.cfg = cfgself.feature_texts = df['feature_text'].valuesself.pn_historys = df['pn_history'].valuesdef __len__(self):return len(self.feature_texts)def __getitem__(self,item):inputs = prepare_input(self.cfg,self.pn_historys[item],self.feature_texts[item])return inputs

12.model设置

class CustomModel(nn.Module):def __init__(self,cfg,config_path = None,pretrained = False):super().__init__()self.cfg = cfgif config_path is None:self.config = AutoConfig.from_pretrained(cfg.model,output_hidden_states = True)else:self.config = torch.load(config_path)if pretrained:self.model = AutoModel.from_pretrained(cfg.model,config = self.config)else:self.model = AutoModel.from_config(self.config)self.fc_dropout_0 = nn.Dropout(0.1)self.fc_dropout_1 = nn.Dropout(0.2)self.fc_dropout_2 = nn.Dropout(0.3)self.fc_dropout_3 = nn.Dropout(0.4)self.fc_dropout_4 = nn.Dropout(0.5)self.fc = nn.Linear(self.config.hidden_size,1)self._init_weights(self.fc)def _init_weights(self,module):if isinstance(module,nn.Linear):module.weight.data.normal_(mean = 0.0,std = self.config.initializer_range)if module.bias is not None:module.bias.data.zero_()elif isinstance(module,nn.Embedding):module.weight.data.normal_(mean = 0.0,std = self.config.initializer_range)if module.padding_idx is not None:module.weight.data[module.padding_idx].zero_()elif isinstance(module,nn.LayerNorm):module.bias.data.zero_()module.weight.data.fill_(1.0)elif isinstance(module,nn.LayerNorm):module.bias.data.zero_()module.weight.data.fill_(1.0)def feature(self,inputs):outputs = self.model(**inputs)last_hidden_states = outputs[0]return last_hidden_statesdef forward(self,inputs):feature = self.feature(inputs)output_0 = self.fc(self.fc_dropout_0(feature))output_1 = self.fc(self.fc_dropout_1(feature))output_2 = self.fc(self.fc_dropout_2(feature))output_3 = self.fc(self.fc_dropout_3(feature))output_4 = self.fc(self.fc_dropout_4(feature))output = (output_0 + output_1 + output_2 + output_3 + output_4) / 5return output
# 测试代码
# import torch
#cfg = CFG()
# inputs = prepare_input(cfg,'I love you!','I hate you')
# for k,v in inputs.items():
# inputs[k] = v.unsqueeze(0)
# model = CustomModel(cfg)
# print(model(inputs).shape)

13.推断函数

def inference_fn(test_loader,model,device):preds = []model.eval()model.to(device)tk0 = tqdm(test_loader,total = len(test_loader))for inputs in tk0:for k,v in inputs.items():inputs[k] = v.to(device)with torch.no_grad():y_preds = model(inputs)preds.append(y_preds.sigmoid().to('cpu').numpy())predictions = np.concatenate(preds)return predictions

14.model测试

test_dataset = TestDataset(CFG,test)
test_loader = DataLoader(test_dataset,batch_size = CFG.batch_size,shuffle = False,num_workers = CFG.num_workers,pin_memory = True,drop_last = False)predictions = []
for fold in CFG.trn_fold:model = CustomModel(CFG,config_path = '../input/deberta-med-score-1/config.pth',pretrained = False)state = torch.load(os.path.join(CFG.path,'microsoft-deberta-v3-large_fold{}_best.pth'.format(fold)),map_location = torch.device('cpu'))model.load_state_dict(state['model'])prediction = inference_fn(test_loader,model,device)prediction = prediction.reshape((len(test),CFG.max_len))char_probs = get_char_probs(test['pn_history'].values, prediction, CFG.tokenizer)predictions.append(char_probs)del model,state,prediction,char_probsgc.collect()torch.cuda.empty_cache()

15.提交

predictions = np.mean(predictions, axis=0)
results = get_results(predictions, th=best_th)
print(results)
submission['location'] = results
display(submission.head())
submission[['id', 'location']].to_csv('submission.csv', index=False)
  相关解决方案