#Lasso feature screening result_method = LassoCV(cv=5, random_state=42).fit( df_temp[features], df_temp[group] ) coef = pd.Series(result_method.coef_, index=features) dropFeature = list(coef[coef == 0].index) result_fea = list(coef[coef != 0].index) # if not searchFeature: # if len(coef)-len(dropFeature) > searchNum: # imp_coef_fea = list(abs(coef[coef != 0]).sort_values().head(len(coef)-len(dropFeature)-searchNum).index) # dropFeature.extend(imp_coef_fea) imp_coef = coef.drop(dropFeature) str_result = ( "A total of " + str(sum(coef != 0)) + " factors are selected by " + method + "method: " + str(result_fea) + ",Its optimal regular parameter is: " + str(round_dec(float(result_method.alpha_), decimal_num)) + "." ) if dropFeature != []: df_tab = df.drop(dropFeature, axis=1) else: df_tab = df fig = plt.figure(figsize=(6, 6), dpi=dpi) imp_coef.plot(kind="barh") plt.title("Coefficients in the Lasso Model") plt.grid(False) plot_name_dict["Coefficients"] = save_fig( savePath, "Lasso", "png", fig, str_time=str_time ) plot_name_dict_save["Coefficients"] = save_fig( savePath, "Lasso", picFormat, fig, str_time=str_time ) plt.close() #model merging def ML_ModelFusion( df, group, features, validation_ratio=0.2, test_ratio=0.2, fusionMethod="Blending", methods=[], method=["LogisticRegression"], n_splits=5, testComp=False, searching=False, savePath=None, dpi=600, picFormat="jpeg", style='lancet', testSet=False, label="LABEL", modelSave=False, testLabel=0, resultType=0, randomState=1, decimal_num=3, **kwargs, ): """ :param df: dataframe ,Data Set :param group: str ,dependent variable :param features: list,independent variable :param test_ratio: float ,Test set proportion :param validation_ratio: float ,Proportion of validation set :param fusionMethod: str ,Blending Stacking :param methods: list ,First level model :param method: list,Second level model :param n_splits: int ,The fold number was cross-validated :param testComp: True False,Whether to test set comparison :param searching: True Automatic parameter finding False default parameters Handle Manual tuning of parameters :param savePath: Image save path :param dpi:image pixel :param picFormat:picture format :param testSet:Whether to fix the test set True/False :param label:Fix the label of the test set :param modelSave:Model save or not :param testLabel:Fix the value of the tag :param resultType:SD or confidence interval (CI) :param randomState:Random seed :param decimal_num:decimal places :param kwargs:Hyperparameters for all model parameters :return: """ name_dict = { "LogisticRegression": "logistic", "XGBClassifier": "XGBoost", "RandomForestClassifier": "RandomForest", "LGBMClassifier": "LightGBM", "SVC": "SVM", "MLPClassifier": "MLP", "GaussianNB": "GNB", "ComplementNB": "CNB", "AdaBoostClassifier": "AdaBoost", "KNeighborsClassifier": "KNN", "DecisionTreeClassifier": "DecisionTree", "BaggingClassifier": "Bagging", "GradientBoostingClassifier": 'GBDT', } palette_dict = { 'lancet': ["#00468BFF", "#ED0000FF", "#42B540FF", "#0099B4FF", "#925E9FFF", "#FDAF91FF", "#AD002AFF", "#ADB6B6FF", "#1B1919FF"], 'nejm': ["#BC3C29FF", "#0072B5FF", "#E18727FF", "#20854EFF", "#7876B1FF", "#6F99ADFF", "#FFDC91FF", "#EE4C97FF", "#BC3C29FF"], 'jama': ["#374E55FF", "#DF8F44FF", "#00A1D5FF", "#B24745FF", "#79AF97FF", "#6A6599FF", "#80796BFF", "#374E55FF", "#DF8F44FF"], 'npg': ["#E64B35FF", "#4DBBD5FF", "#00A087FF", "#3C5488FF", "#F39B7FFF", "#8491B4FF", "#91D1C2FF", "#DC0000FF", "#7E6148FF", "#B09C85FF"]} if method[0] in methods: return {"error": "For the second layer model, please do not choose the model selected by the first layer, but try to choose a simple model such as logistic regression!"+"false-error"} str_time = ( str(datetime.datetime.now().hour) + str(datetime.datetime.now().minute) + str(datetime.datetime.now().second) ) random_number = random.randint(1, 100) str_time = str_time + str(random_number) list_name = [group] plot_name_dict, plot_name_dict_save = {}, {} ##Store pictures result_model_save, result_model_save_files = {}, {} ##Model storage resThreshold = 0 ##It is used to store the final threshold conf_dic_train, conf_dic_valid, conf_dic_test = {}, {}, {} sdorci = " SD " if resultType == 0 else " 95%CI " if testSet: df = df[features + [group] + [label]].dropna() for fea in features: if fea == label or label == group: return {"error": "The label column cannot be in the model, please select the data partition label column again!"+"false-error"} else: df = df[features + [group]].dropna() binary = True u = np.sort(np.unique(np.array(df[group]))) str_result = "The model fusion(" + fusionMethod + ")method was used to classify the samples, where the first layer model included:{},\n".format(methods) if len(u) == 2 and set(u) != set([0, 1]): y_result = label_binarize(df[group], classes=[ii for ii in u]) # Binarization of the label y_result_pd = pd.DataFrame(y_result, columns=[group]) df = pd.concat([df.drop(group, axis=1), y_result_pd], axis=1) str_result += "Outcomes were binarized to treat " + str(u) + " as 0 and 1." elif len(u) > 2: if len(u) > 10: return {"error": "The number of categories greater than 10 is not allowed. Please check the value of the dependent variable."+"false-error"} binary = False return {"error": "Only binary classification is supported for the time being. Please check the value of the dependent variable."+"false-error"} if testSet: if isinstance(df[label][0], str): testLabel = str(testLabel) df = df[features + [group] + [label]].dropna() test_a = df[df[label] == testLabel] train_a = df[df[label] != testLabel] train_all = train_a.drop(label, axis=1) test_all = test_a.drop(label, axis=1) # features.remove(fea) df = df.drop(label, axis=1) Xtrain = train_all.drop(group, axis=1) Ytrain = train_all.loc[:, list_name].squeeze(axis=1) Xtest = test_all.drop(group, axis=1) Ytest = test_all.loc[:, list_name].squeeze(axis=1) else: df = df[features + [group]].dropna() X = df.drop(group, axis=1) Y = df.loc[:, list_name].squeeze(axis=1) Xtrain, Xtest, Ytrain, Ytest = TTS( X, Y, test_size=test_ratio, random_state=randomState, ) str_result += "\nThe number of samples in the data set was total " + str(df.shape[0]) + ".Among them, " for label in [0, 1]: n = sum(df[group] == label) str_result += "\t there were " + str(n) + " cases in " + str(label) + " category\n" str_result += "\nThe total number of samples in the training set was " + str(Ytrain.shape[0]) + ".Among them, " for label in [0, 1]: if label == 0: n = len(Ytrain) - sum(Ytrain) else: n = sum(Ytrain) str_result += "\t there were " + str(n) + " cases in " + str(label) + " category\n" df_dict = {} fpr_trains, tpr_trains, metric_dic_trains = [], [], [] fpr_valids, tpr_valids, metric_dic_valids = [], [], [] fpr_tests, tpr_tests, metric_dic_tests = [], [], [] mean_fpr = np.linspace(0, 1, 100) if fusionMethod == "Blending": X_train_d1, X_train_d2, y_train_d1, y_train_d2 = TTS( Xtrain, Ytrain, test_size=validation_ratio, random_state=randomState ) X_train_d2_blending = np.zeros((X_train_d2.shape[0], len(methods))) X_test_blending = np.zeros((Xtest.shape[0], len(methods))) X_trains = [X_train_d1] * len(methods) Y_trains = [y_train_d1] * len(methods) fitall, models = modelfit(methods, searching, X_trains, Y_trains, **kwargs) if fitall == "error": return {"error": fusionMethod + "Methods When the first layer model was trained, the parameter setting was wrong, and the error message was:" + models+"false-error"} for j, clf in enumerate(fitall): if modelSave: modelfile = open( savePath + fusionMethod + "&" + methods[j] + "&" + str(j) + "&" + str_time + ".pkl", "wb", ) pickle.dump(clf, modelfile) modelfile.close() result_model_save_files.update( { methods[j] + str(j): fusionMethod + "&" + methods[j] + "&" + str(j) + "&" + str_time + ".pkl" } ) y_test_value = clf.predict_proba(X_train_d2)[:, 1] X_train_d2_blending[:, j] = y_test_value X_test_blending[:, j] = clf.predict_proba(Xtest)[:, 1] fpr_train, tpr_train, metric_dic_train, _ = classification_metric_evaluate( clf, X_train_d1, y_train_d1, binary ) fpr_valid, tpr_valid, metric_dic_valid, _ = classification_metric_evaluate( clf, X_train_d2, y_train_d2, binary, Threshold=metric_dic_train["cutoff"], ) str_result += ( "The parameter values of each model are selected as follows:\n" + methods[j] + ": AUC=" + str(round_dec(metric_dic_train["AUC"], decimal_num)) + "; model parameter:\n" + dic2str(models[j].get_params(), methods[j]) + "\n" ) if testComp: fpr_test, tpr_test, metric_dic_test, _ = classification_metric_evaluate( clf, Xtest, Ytest, binary, Threshold=metric_dic_train["cutoff"] ) metric_dic_valid.update({"cutoff": metric_dic_train["cutoff"]}) # tprs_train.append(np.interp(mean_fpr, fpr_train, tpr_train)) # tprs_test.append(np.interp(mean_fpr, fpr_test, tpr_test)) fpr_trains.append(fpr_train) tpr_trains.append(np.interp(mean_fpr, fpr_train, tpr_train)) tpr_trains[-1][0] = 0.0 metric_dic_trains.append(metric_dic_train) # fpr_valids.append(fpr_valid) tpr_valids.append(np.interp(mean_fpr, fpr_valid, tpr_valid)) tpr_valids[-1][0] = 0.0 metric_dic_valids.append(metric_dic_valid) if testComp: tpr_tests.append(np.interp(mean_fpr, fpr_test, tpr_test)) tpr_tests[-1][0] = 0.0 metric_dic_tests.append(metric_dic_test) fit_L, model_L = modelfit( method, searching, [X_train_d2_blending], [y_train_d2], **kwargs ) if fit_L == "error": return {"error": fusionMethod + "Methods During the training of the second layer model, the parameter setting was wrong, and the error message was:" + model_L+"false-error"} fpr_test, tpr_test, metric_dic_test, _ = classification_metric_evaluate( fit_L[0], X_test_blending, Ytest, binary ) if binary: ###Draw the DCA curve DCA_dict = {} ( prob_pos, p_serie, net_benefit_serie, net_benefit_serie_All, ) = calculate_net_benefit(fit_L[0], X_test_blending, Ytest) DCA_dict[name_dict[method[0]]] = { "p_serie": p_serie, "net_b_s": net_benefit_serie, "net_b_s_A": net_benefit_serie_All, } decision_curve_p = plot_decision_curves( DCA_dict, colors=palette_dict['lancet'], name="Test", savePath=savePath, dpi=dpi, picFormat=picFormat, ) plot_name_dict["DCA curve of the test set"] = decision_curve_p[0] plot_name_dict_save["DCA curve of the test set"] = decision_curve_p[1] if modelSave: modelfile = open( savePath + fusionMethod + "&" + method[0] + "&" + str(0) + "&" + str_time + ".pkl", "wb", ) pickle.dump(fit_L[0], modelfile) modelfile.close() result_model_save_files.update( { method[0] + str(0): fusionMethod + "&" + method[0] + "&" + str(0) + "&" + str_time + ".pkl" } ) result_model_save["modelFile"] = result_model_save_files result_model_save["modelFeature"] = features result_model_save["modelShapValue"] = methods result_model_save["modelName"] = method[0] result_model_save["modelClass"] = fusionMethod result_model_save["Threshold"] = metric_dic_test["cutoff"] str_result += ( "The second layer is adopted" + name_dict[method[0]] + "Methods The prediction results of the model trained in the first layer in the validation set were modeled, and its prediction results in the test set were modeled" + ": AUC=" + str(round_dec(metric_dic_test["AUC"], decimal_num)) + "; model parameter:\n" + dic2str(model_L[0].get_params(), method[0]) + "\n" ) method_list = [] for met in range(len(methods)): method_list.append(name_dict[methods[met]]) df_train = pd.DataFrame(metric_dic_trains, index=method_list) df_valid = pd.DataFrame(metric_dic_valids, index=method_list) if testComp: metric_dic_tests.append(metric_dic_test) method_list.append(name_dict[method[0]] + "(" + fusionMethod + ")") df_test = pd.DataFrame(metric_dic_tests, index=method_list) else: df_test = pd.DataFrame( metric_dic_test, index=[name_dict[method[0]] + "(" + fusionMethod + ")"] ) df_train = df_train.applymap(lambda x: round_dec(x, d=decimal_num)) df_valid = df_valid.applymap(lambda x: round_dec(x, d=decimal_num)) df_test = df_test.applymap(lambda x: round_dec(x, d=decimal_num)) if testComp: test_value = list(df_test["AUC"]) test_value_name = list(df_test.index) str_result += ( "The model with the best performance in the test set is:" + test_value_name[test_value.index(max(test_value))] + ",Its AUC value was:" + str(round_dec(max(test_value), decimal_num)) + "。" ) else: str_result += "The AUC value of the final fusion model in the test set was:" + str(list(df_test["AUC"])[0]) + "。" ##Draw the ROC of the training set if binary: fig = plt.figure(figsize=(4, 4), dpi=dpi) # Draw the diagonal plt.plot( [0, 1], [0, 1], linestyle="--", lw=1, color="r", alpha=0.8, ) plt.grid( which="major", axis="both", linestyle="-.", alpha=0.08, color="grey" ) for i in range(len(tpr_trains)): plt.plot( mean_fpr, tpr_trains[i], lw=1, alpha=0.4, c=palette_dict[style][i], label=name_dict[methods[i]] + "(AUC=%0.3f 95%%CI (%0.3f-%0.3f)) " % ( metric_dic_trains[i]["AUC"], metric_dic_trains[i]["AUC_L"], metric_dic_trains[i]["AUC_U"], ), ) plt.xlim([-0.02, 1.02]) plt.ylim([-0.02, 1.02]) plt.xlabel("1-Specificity") plt.ylabel("Sensitivity") plt.title("ROC curve(Training)") plt.legend(loc="lower right", fontsize=5) if savePath is not None: plot_name_dict["ROC curve of the training set"] = save_fig( savePath, "ROC_curve_train", "png", fig, str_time=str_time ) plot_name_dict_save["ROC curve of the training set"] = save_fig( savePath, "ROC_curve_train", picFormat, fig, str_time=str_time ) plt.close() ##Draw the ROC of the validation set if binary: fig = plt.figure(figsize=(4, 4), dpi=dpi) # Draw the diagonal plt.plot( [0, 1], [0, 1], linestyle="--", lw=1, color="r", alpha=0.8, ) plt.grid( which="major", axis="both", linestyle="-.", alpha=0.08, color="grey" ) for i in range(len(tpr_valids)): plt.plot( mean_fpr, tpr_valids[i], lw=1, alpha=0.4, c=palette_dict[style][i], label=name_dict[methods[i]] + "(AUC=%0.3f 95%%CI (%0.3f-%0.3f)) " % ( metric_dic_valids[i]["AUC"], metric_dic_valids[i]["AUC_L"], metric_dic_valids[i]["AUC_U"], ), ) plt.xlim([-0.02, 1.02]) plt.ylim([-0.02, 1.02]) plt.xlabel("1-Specificity") plt.ylabel("Sensitivity") plt.title("ROC curve(Validation)") plt.legend(loc="lower right", fontsize=5) if savePath is not None: plot_name_dict["ROC curve of validation set"] = save_fig( savePath, "ROC_curve_valid", "png", fig, str_time=str_time ) plot_name_dict_save["ROC curve of validation set"] = save_fig( savePath, "ROC_curve_valid", picFormat, fig, str_time=str_time ) plt.close() ###Draw the test set ROC if binary: fig = plt.figure(figsize=(4, 4), dpi=dpi) # Draw the diagonal plt.plot( [0, 1], [0, 1], linestyle="--", lw=1, color="r", alpha=0.8, ) plt.grid( which="major", axis="both", linestyle="-.", alpha=0.08, color="grey" ) from scipy.interpolate import interp1d tpr_test_unique, tpr_test_index = np.unique(fpr_test, return_index=True) fpr_test_new = np.linspace(min(fpr_test), max(fpr_test), len(fpr_test)) f = interp1d( tpr_test_unique, tpr_test[tpr_test_index], kind="linear" ) ##cubic tpr_test_new = f(fpr_test_new) plt.plot( fpr_test_new, tpr_test_new, lw=1.5, alpha=0.6, color=palette_dict[style][len(methods)], label="Test ROC(" + fusionMethod + ") (AUC=%0.3f 95%%CI (%0.3f-%0.3f)) " % ( metric_dic_test["AUC"], metric_dic_test["AUC_L"], metric_dic_test["AUC_U"], ), ) if testComp: for i in range(len(methods)): plt.plot( mean_fpr, tpr_tests[i], lw=1, alpha=0.4, c=palette_dict[style][i], label=name_dict[methods[i]] + "(AUC=%0.3f 95%%CI (%0.3f-%0.3f)) " % ( metric_dic_tests[i]["AUC"], metric_dic_tests[i]["AUC_L"], metric_dic_tests[i]["AUC_U"], ), ) plt.xlim([-0.02, 1.02]) plt.ylim([-0.02, 1.02]) plt.xlabel("1-Specificity") plt.ylabel("Sensitivity") plt.title("ROC curve(Test)") plt.legend(loc="lower right", fontsize=5) if savePath is not None: plot_name_dict["ROC curve of the test set"] = save_fig( savePath, "ROC_curve_test", "png", fig, str_time=str_time ) plot_name_dict_save["ROC curve of the test set"] = save_fig( savePath, "ROC_curve_test", picFormat, fig, str_time=str_time ) plt.close() elif fusionMethod == "Stacking": X_train = np.array(Xtrain) Y_train = np.array(Ytrain) X_train_stacking = np.zeros((len(Xtrain), len(methods))) X_test_stacking = np.zeros((len(Xtest), len(methods))) skf = list(StratifiedKFold(n_splits=n_splits).split(X_train, Y_train)) X_train_folds, Y_train_folds, X_valid_folds, Y_valid_folds, fold2s = ( [], [], [], [], [], ) for i, (fold1, fold2) in enumerate(skf): X_train_fold, Y_train_fold, X_valid_fold, Y_valid_fold = ( X_train[fold1], Y_train[fold1], X_train[fold2], Y_train[fold2], ) X_train_folds.append(X_train_fold) Y_train_folds.append(Y_train_fold) X_valid_folds.append(X_valid_fold) Y_valid_folds.append(Y_valid_fold) fold2s.append(fold2) for j, met in enumerate(methods): X_test_stacking_j = np.zeros((Xtest.shape[0], len(skf))) fitall, models = modelfit( [met] * len(X_train_folds), searching, X_train_folds, Y_train_folds, **kwargs, ) if fitall == "error": return {"error": fusionMethod + "Methods When the first layer model was trained, the parameter setting was wrong, and the error message was:" + models+"false-error"} for ii in range(len(fitall)): if modelSave: modelfile = open( savePath + fusionMethod + "&" + met + "&" + str(ii) + "&" + str_time + ".pkl", "wb", ) pickle.dump(fitall[ii], modelfile) modelfile.close() result_model_save_files.update( { met + str(ii): fusionMethod + "&" + met + "&" + str(ii) + "&" + str_time + ".pkl" } ) # result_model_save['modelFile'] = fusionMethod + '&' + met + '&' + str(ii)+'&'+str_time + '.pkl' # result_model_save['modelFeature'] = features # result_model_save['modelName'] = met # result_model_save['modelClass'] = fusionMethod y_test_value = fitall[ii].predict_proba(X_valid_folds[ii])[:, 1] X_train_stacking[fold2s[ii], j] = y_test_value X_test_stacking_j[:, ii] = fitall[ii].predict_proba(Xtest)[:, 1] ( fpr_train, tpr_train, metric_dic_train, _, ) = classification_metric_evaluate( fitall[ii], X_train_folds[ii], Y_train_folds[ii], binary ) ( fpr_valid, tpr_valid, metric_dic_valid, _, ) = classification_metric_evaluate( fitall[ii], X_valid_folds[ii], Y_valid_folds[ii], binary, Threshold=metric_dic_train["cutoff"], ) if testComp: ( fpr_test, tpr_test, metric_dic_test, _, ) = classification_metric_evaluate( fitall[ii], Xtest, Ytest, binary, Threshold=metric_dic_train["cutoff"], ) metric_dic_valid.update({"cutoff": metric_dic_train["cutoff"]}) # tprs_train.append(np.interp(mean_fpr, fpr_train, tpr_train)) # tprs_test.append(np.interp(mean_fpr, fpr_test, tpr_test)) # fpr_trains.append(fpr_train) tpr_trains.append(np.interp(mean_fpr, fpr_train, tpr_train)) tpr_trains[-1][0] = 0.0 metric_dic_trains.append(metric_dic_train) # fpr_valids.append(fpr_valid) tpr_valids.append(np.interp(mean_fpr, fpr_valid, tpr_valid)) tpr_valids[-1][0] = 0.0 metric_dic_valids.append(metric_dic_valid) if testComp: tpr_tests.append(np.interp(mean_fpr, fpr_test, tpr_test)) tpr_tests[-1][0] = 0.0 metric_dic_tests.append(metric_dic_test) X_test_stacking[:, j] = X_test_stacking_j.mean(axis=1) fit_L, model_L = modelfit( method, searching, [X_train_stacking], [Y_train], **kwargs ) if fit_L == "error": return {"error": fusionMethod + "Methods When the first layer model was trained, the parameter setting was wrong, and the error message was:" + model_L+"false-error"} fpr_test, tpr_test, metric_dic_test, _ = classification_metric_evaluate( fit_L[0], X_test_stacking, Ytest, binary ) if binary: ###Draw the DCA curve DCA_dict = {} ( prob_pos, p_serie, net_benefit_serie, net_benefit_serie_All, ) = calculate_net_benefit(fit_L[0], X_test_stacking, Ytest) DCA_dict[name_dict[method[0]]] = { "p_serie": p_serie, "net_b_s": net_benefit_serie, "net_b_s_A": net_benefit_serie_All, } decision_curve_p = plot_decision_curves( DCA_dict, colors=palette_dict['lancet'], name="Test", savePath=savePath, dpi=dpi, picFormat=picFormat, ) plot_name_dict["DCA curve of the test set"] = decision_curve_p[0] plot_name_dict_save["DCA curve of the test set"] = decision_curve_p[1] if modelSave: modelfile = open( savePath + fusionMethod + "&" + method[0] + "&" + str(0) + "&" + str_time + ".pkl", "wb", ) pickle.dump(fit_L[0], modelfile) modelfile.close() result_model_save_files.update( { method[0] + str(0): fusionMethod + "&" + method[0] + "&" + str(0) + "&" + str_time + ".pkl" } ) result_model_save["modelFile"] = result_model_save_files result_model_save["modelFeature"] = features result_model_save["modelShapValue"] = methods result_model_save["modelName"] = method[0] result_model_save["modelClass"] = fusionMethod result_model_save["Threshold"] = metric_dic_test["cutoff"] str_result += ( "The second layer trains " + str(n_splits) + " weak classifiers by " + str(n_splits) + "-fold cross-validation of the first layer,the validation set was also predicted,then " + name_dict[method[0]] + " model was used for training and modeling,it's in the test set" + ": AUC=" + str(round_dec(metric_dic_test["AUC"], decimal_num)) + "; model parameter:\n" + dic2str(model_L[0].get_params(), method[0]) + "\n" ) method_list = [] for met in range(len(methods)): method_list.append(name_dict[methods[met]]) list_evaluate_dic_train, list_evaluate_dic_valid, list_evaluate_dic_test = ( [], [], [], ) df_dic_trains = pd.DataFrame(metric_dic_trains) df_dic_valids = pd.DataFrame(metric_dic_valids) if testComp: df_dic_tests = pd.DataFrame(metric_dic_tests) for i in range(len(methods)): list_evaluate_dic_train.append( np.mean(df_dic_trains.iloc[i * n_splits : (i + 1) * n_splits,], axis=0) ) list_evaluate_dic_valid.append( np.mean(df_dic_valids.iloc[i * n_splits : (i + 1) * n_splits], axis=0) ) if testComp: list_evaluate_dic_test.append( np.mean( df_dic_tests.iloc[i * n_splits : (i + 1) * n_splits], axis=0 ) ) df_dic_trains_mean = pd.DataFrame(list_evaluate_dic_train) df_dic_valids_mean = pd.DataFrame(list_evaluate_dic_valid) if testComp: df_dic_tests_mean = pd.DataFrame(list_evaluate_dic_test) resultType_train, resultType_valid, resultType_test = [], [], [] for i in range(len(methods)): if resultType == 0: ##SD resultType_train.append( np.std( df_dic_trains.iloc[i * n_splits : (i + 1) * n_splits,], axis=0 ) ) resultType_valid.append( np.std( df_dic_valids.iloc[i * n_splits : (i + 1) * n_splits,], axis=0 ) ) if testComp: resultType_test.append( np.std( df_dic_tests.iloc[i * n_splits : (i + 1) * n_splits,], axis=0, ) ) elif resultType == 1: ##CI resultType_train.append( list(ci(df_dic_trains.iloc[i * n_splits : (i + 1) * n_splits,])) ) resultType_valid.append( list(ci(df_dic_valids.iloc[i * n_splits : (i + 1) * n_splits,])) ) if testComp: resultType_test.append( list(ci(df_dic_tests.iloc[i * n_splits : (i + 1) * n_splits,])) ) list_df_trains, list_df_valids, list_df_tests = [], [], [] if resultType == 0: ##SD for i in range(len(methods)): list_df_train, list_df_valid, list_df_test = [], [], [] for key in metric_dic_trains[0].keys(): list_df_train.append( str( round_dec( float(df_dic_trains_mean.loc[i, key]), d=decimal_num ) ) + "(" + str(round_dec(float(resultType_train[i][key]), d=decimal_num)) + ")" ) list_df_valid.append( str( round_dec( float(df_dic_valids_mean.loc[i, key]), d=decimal_num ) ) + "(" + str(round_dec(float(resultType_valid[i][key]), d=decimal_num)) + ")" ) if testComp: list_df_test.append( str( round_dec( float(df_dic_tests_mean.loc[i, key]), d=decimal_num ) ) + "(" + str( round_dec(float(resultType_test[i][key]), d=decimal_num) ) + ")" ) list_df_trains.append(list_df_train) list_df_valids.append(list_df_valid) if testComp: list_df_tests.append(list_df_test) elif resultType == 1: for i in range(len(methods)): list_df_train, list_df_valid, list_df_test = [], [], [] for key in metric_dic_trains[0].keys(): if key == "AUC": list_df_train.append( str( round_dec( float(df_dic_trains_mean.loc[i, key]), decimal_num ) ) + " (" + str( round_dec( float(df_dic_trains_mean.loc[i, "AUC_L"]), decimal_num, ) ) + "-" + str( round_dec( float(df_dic_trains_mean.loc[i, "AUC_U"]), decimal_num, ) ) + ")" ) list_df_valid.append( str( round_dec( float(df_dic_valids_mean.loc[i, key]), decimal_num ) ) + " (" + str( round_dec( float(df_dic_valids_mean.loc[i, "AUC_L"]), decimal_num, ) ) + "-" + str( round_dec( float(df_dic_valids_mean.loc[i, "AUC_U"]), decimal_num, ) ) + ")" ) if testComp: list_df_test.append( str( round_dec( float(df_dic_tests_mean.loc[i, key]), decimal_num, ) ) + " (" + str( round_dec( float(df_dic_tests_mean.loc[i, "AUC_L"]), decimal_num, ) ) + "-" + str( round_dec( float(df_dic_tests_mean.loc[i, "AUC_U"]), decimal_num, ) ) + ")" ) else: list_df_train.append( str( round_dec( float(df_dic_trains_mean.loc[i, key]), decimal_num ) ) + " (" + str( round_dec( float(resultType_train[i][0][key]), decimal_num ) ) + "-" + str( round_dec( float(resultType_train[i][1][key]), decimal_num ) ) + ")" ) list_df_valid.append( str( round_dec( float(df_dic_valids_mean.loc[i, key]), decimal_num ) ) + " (" + str( round_dec( float(resultType_valid[i][0][key]), decimal_num ) ) + "-" + str( round_dec( float(resultType_valid[i][1][key]), decimal_num ) ) + ")" ) if testComp: list_df_test.append( str( round_dec( float(df_dic_tests_mean.loc[i, key]), decimal_num, ) ) + " (" + str( round_dec( float(resultType_test[i][0][key]), decimal_num ) ) + "-" + str( round_dec( float(resultType_test[i][0][key]), decimal_num ) ) + ")" ) list_df_trains.append(list_df_train) list_df_valids.append(list_df_valid) if testComp: list_df_tests.append(list_df_test) df_train = pd.DataFrame( list_df_trains, index=method_list, columns=df_dic_trains_mean.columns ) df_valid = pd.DataFrame( list_df_valids, index=method_list, columns=df_dic_valids_mean.columns ) if testComp: # list_df_tests.append(metric_dic_test) # method_list.append(name_dict[method[0]] + '(' + fusionMethod + ')') df_test1 = pd.DataFrame( list_df_tests, index=method_list, columns=df_dic_tests_mean.columns ) df_metric_dic_test = pd.DataFrame( metric_dic_test, index=[name_dict[method[0]] + "(" + fusionMethod + ")"] ) df_metric_dic_test = df_metric_dic_test.applymap( lambda x: round_dec(x, d=decimal_num) ) df_test = pd.concat([df_test1, df_metric_dic_test], axis=0) else: df_test = pd.DataFrame( metric_dic_test, index=[name_dict[method[0]] + "(" + fusionMethod + ")"] ) df_test = df_test.applymap(lambda x: round_dec(x, d=decimal_num)) if testComp: test_value = list(df_dic_tests_mean["AUC"]) test_value.append(metric_dic_test["AUC"]) test_value_name = methods[:] test_value_name.append(method[0] + "(" + fusionMethod + ")") str_result += ( "The best performing model in the test set is: " + test_value_name[test_value.index(max(test_value))] + ",AUC=" + str(round(max(test_value), decimal_num)) + "。" ) else: str_result += "The AUC value of the final fusion model in the test set was:" + str(list(df_test["AUC"])[0]) + "。" ##Draw the ROC of the training set if binary: fig = plt.figure(figsize=(4, 4), dpi=dpi) # Draw the diagonal plt.plot( [0, 1], [0, 1], linestyle="--", lw=1, color="r", alpha=0.8, ) plt.grid( which="major", axis="both", linestyle="-.", alpha=0.08, color="grey" ) for i in range(len(methods)): mean_AUC, mean_AUC_L, mean_AUC_U = 0, 0, 0 for j in range(n_splits): mean_AUC += metric_dic_trains[i * n_splits + j]["AUC"] mean_AUC_L += metric_dic_trains[i * n_splits + j]["AUC_L"] mean_AUC_U += metric_dic_trains[i * n_splits + j]["AUC_U"] plt.plot( mean_fpr, np.mean(tpr_trains[i * n_splits : (i + 1) * n_splits], axis=0), lw=1, alpha=0.4, c=palette_dict[style][i], label=name_dict[methods[i]] + "(AUC = " + df_train['AUC'][i][:df_train['AUC'][i].find('(')].strip() + sdorci + df_train['AUC'][i][df_train['AUC'][i].find('('):].strip() + ")", ) plt.xlim([-0.02, 1.02]) plt.ylim([-0.02, 1.02]) plt.xlabel("1-Specificity") plt.ylabel("Sensitivity") plt.title("ROC curve(Training)") plt.legend(loc="lower right", fontsize=5) if savePath is not None: plot_name_dict["ROC curve of the training set"] = save_fig( savePath, "ROC_curve_train", "png", fig, str_time=str_time ) plot_name_dict_save["ROC curve of the training set"] = save_fig( savePath, "ROC_curve_train", picFormat, fig, str_time=str_time ) plt.close() ##Draw the ROC of the validation set if binary: fig = plt.figure(figsize=(4, 4), dpi=dpi) # Draw the diagonal plt.plot( [0, 1], [0, 1], linestyle="--", lw=1, color="r", alpha=0.8, ) plt.grid( which="major", axis="both", linestyle="-.", alpha=0.08, color="grey" ) for i in range(len(methods)): mean_AUC, mean_AUC_L, mean_AUC_U = 0, 0, 0 for j in range(n_splits): mean_AUC += metric_dic_valids[i * n_splits + j]["AUC"] mean_AUC_L += metric_dic_valids[i * n_splits + j]["AUC_L"] mean_AUC_U += metric_dic_valids[i * n_splits + j]["AUC_U"] plt.plot( mean_fpr, np.mean(tpr_valids[i * n_splits : (i + 1) * n_splits], axis=0), lw=1, alpha=0.4, c=palette_dict[style][i], label=name_dict[methods[i]] + "(AUC = " + df_valid['AUC'][i][:df_valid['AUC'][i].find('(')].strip() + sdorci + df_valid['AUC'][i][df_valid['AUC'][i].find('('):].strip() + ")", ) plt.xlim([-0.02, 1.02]) plt.ylim([-0.02, 1.02]) plt.xlabel("1-Specificity") plt.ylabel("Sensitivity") plt.title("ROC curve(Validation)") plt.legend(loc="lower right", fontsize=5) if savePath is not None: plot_name_dict["ROC curve of validation set"] = save_fig( savePath, "ROC_curve_valid", "png", fig, str_time=str_time ) plot_name_dict_save["ROC curve of validation set"] = save_fig( savePath, "ROC_curve_valid", picFormat, fig, str_time=str_time ) plt.close() ###Draw the test set ROC if binary: fig = plt.figure(figsize=(4, 4), dpi=dpi) # Draw the diagonal plt.plot( [0, 1], [0, 1], linestyle="--", lw=1, color="r", alpha=0.8, ) plt.grid( which="major", axis="both", linestyle="-.", alpha=0.08, color="grey" ) from scipy.interpolate import interp1d tpr_test_unique, tpr_test_index = np.unique(fpr_test, return_index=True) fpr_test_new = np.linspace(min(fpr_test), max(fpr_test), len(fpr_test)) f = interp1d( tpr_test_unique, tpr_test[tpr_test_index], kind="linear" ) ##cubic tpr_test_new = f(fpr_test_new) if testComp: roc_len = len(methods) else: roc_len = 0 if resultType == 1: test_label = '%s(%s) (AUC=%0.3f 95%%CI (%0.3f-%0.3f))' % (name_dict[method[0]], fusionMethod, df_test['AUC'][roc_len], df_test['AUC_L'][roc_len], df_test['AUC_U'][roc_len]) else: test_label ='%s(%s) (AUC=%0.3f)' % (name_dict[method[0]], fusionMethod, df_test['AUC'][roc_len]) plt.plot( fpr_test_new, tpr_test_new, lw=1.5, alpha=0.6, color=palette_dict[style][len(methods)], label=test_label, ) if testComp: for i in range(len(methods)): plt.plot( mean_fpr, tpr_tests[i], lw=1, alpha=0.4, c=palette_dict[style][i], label=name_dict[methods[i]] + "(AUC = " + df_test['AUC'][i][:df_test['AUC'][i].find('(')].strip() + sdorci + df_test['AUC'][i][df_test['AUC'][i].find('('):].strip() + ")", ) plt.xlim([-0.02, 1.02]) plt.ylim([-0.02, 1.02]) plt.xlabel("1-Specificity") plt.ylabel("Sensitivity") plt.title("ROC curve(Test)") plt.legend(loc="lower right", fontsize=5) if savePath is not None: plot_name_dict["ROC curve of the test set"] = save_fig( savePath, "ROC_curve_test", "png", fig, str_time=str_time ) plot_name_dict_save["ROC curve of the test set"] = save_fig( savePath, "ROC_curve_test", picFormat, fig, str_time=str_time ) plt.close() df_train = df_train.drop(["AUC_L", "AUC_U"], axis=1) df_valid = df_valid.drop(["AUC_L", "AUC_U"], axis=1) df_test = df_test.drop(["AUC_L", "AUC_U"], axis=1) if resultType == 0 and fusionMethod == "Stacking": df_train.rename( columns={ "AUC": "AUC(SD)", "cutoff": "cutoff(SD)", "Accuracy": "Accuracy(SD)", "Sensitivity": "Sensitivity(SD)", "Specificity": "Specificity(SD)", "positive predictive value": "positive predictive value(SD)", "negative predictive value": "negative predictive value(SD)", "F1 score": "F1 score(SD)", "Kappa": "Kappa(SD)", }, inplace=True, ) df_valid.rename( columns={ "AUC": "AUC(SD)", "cutoff": "cutoff(SD)", "Accuracy": "Accuracy(SD)", "Sensitivity": "Sensitivity(SD)", "Specificity": "Specificity(SD)", "positive predictive value": "positive predictive value(SD)", "negative predictive value": "negative predictive value(SD)", "F1 score": "F1 score(SD)", "Kappa": "Kappa(SD)", }, inplace=True, ) df_test.rename( columns={ "AUC": "AUC(SD)", "cutoff": "cutoff(SD)", "Accuracy": "Accuracy(SD)", "Sensitivity": "Sensitivity(SD)", "Specificity": "Specificity(SD)", "positive predictive value": "positive predictive value(SD)", "negative predictive value": "negative predictive value(SD)", "F1 score": "F1 score(SD)", "Kappa": "Kappa(SD)", }, inplace=True, ) elif resultType == 1 and fusionMethod == "Stacking": df_train.rename( columns={ "AUC": "AUC(95%CI)", "cutoff": "cutoff(95%CI)", "Accuracy": "Accuracy(95%CI)", "Sensitivity": "Sensitivity(95%CI)", "Specificity": "Specificity(95%CI)", "positive predictive value": "positive predictive value(95%CI)", "negative predictive value": "negative predictive value(95%CI)", "F1 score": "F1 score(95%CI)", "Kappa": "Kappa(95%CI)", }, inplace=True, ) df_valid.rename( columns={ "AUC": "AUC(95%CI)", "cutoff": "cutoff(95%CI)", "Accuracy": "Accuracy(95%CI)", "Sensitivity": "Sensitivity(95%CI)", "Specificity": "Specificity(95%CI)", "positive predictive value": "positive predictive value(95%CI)", "negative predictive value": "negative predictive value(95%CI)", "F1 score": "F1 score(95%CI)", "Kappa": "Kappa(95%CI)", }, inplace=True, ) df_test.rename( columns={ "AUC": "AUC(95%CI)", "cutoff": "cutoff(95%CI)", "Accuracy": "Accuracy(95%CI)", "Sensitivity": "Sensitivity(95%CI)", "Specificity": "Specificity(95%CI)", "positive predictive value": "positive predictive value(95%CI)", "positive predictive value": "positive predictive value(95%CI)", "F1 score": "F1 score(95%CI)", "Kappa": "Kappa(95%CI)", }, inplace=True, ) df_dict.update({"Table of results for training set analysis": df_train}) df_dict.update({"Table of results for validation set analysis": df_valid}) df_dict.update({"Table of results for test set analysis": df_test}) pic_keys = list(plot_name_dict.keys()) pic_values = list(plot_name_dict.values()) pic_keys.reverse() pic_values.reverse() plot_name_dict = dict(zip(pic_keys, pic_values)) result_dict = { "str_result": {"Description of analysis results": str_result}, "tables": df_dict, "pics": plot_name_dict, "save_pics": plot_name_dict_save, "model": result_model_save, } return result_dict