Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Department Computational Biology
deepFPlearn
Commits
5263dbae
Commit
5263dbae
authored
Apr 29, 2022
by
Matthias Bernt
Browse files
reset dfpl/ to master
parent
39c960ce
Changes
14
Expand all
Hide whitespace changes
Inline
Side-by-side
dfpl/__init__.py
View file @
5263dbae
# read version from installed package
from
importlib.metadata
import
version
__version__
=
version
(
"dfpl"
)
\ No newline at end of file
dfpl/__main__.py
View file @
5263dbae
...
...
@@ -4,74 +4,67 @@ import pathlib
import
dataclasses
from
os
import
path
from
tensorflow
import
keras
import
wandb
from
dfpl.utils
import
makePathAbsolute
,
createDirectory
from
dfpl
import
options
from
dfpl
import
fingerprint
as
fp
from
dfpl
import
autoencoder
as
ac
from
dfpl
import
feedforwardNN
as
fNN
from
dfpl
import
predictions
from
dfpl
import
single_label_model
as
sl
project_directory
=
pathlib
.
Path
(
__file__
).
parent
.
parent
.
absolute
()
opts
=
options
.
TrainOptions
(
inputFile
=
f
"
{
project_directory
}
/data/muv.pkl"
,
outputDir
=
f
"
{
project_directory
}
/modeltraining"
,
ecWeightsFile
=
"/home/hertelj/git-hertelj/deepFPlearn_CODE/validation/case_00/results_AC_D/ac_D.encoder.hdf5"
,
type
=
'smiles'
,
fpType
=
'topological'
,
epochs
=
3000
,
fpSize
=
2048
,
encFPSize
=
256
,
enableMultiLabel
=
False
,
testingFraction
=
0.2
,
kFolds
=
5
,
verbose
=
2
,
trainAC
=
False
,
trainFNN
=
True
,
compressFeatures
=
True
)
project_directory
=
pathlib
.
Path
(
"."
).
parent
.
parent
.
absolute
()
opts
=
options
.
TrainOptions
(
inputFile
=
f
"
{
project_directory
}
/data/MoleculeNet/Biophysics/muv.pkl"
,
outputDir
=
f
"
{
project_directory
}
/validation/case_Tox21/results_AC-specific/"
,
ecWeightsFile
=
""
,
#f"{project_directory}/validation/case_Tox21/results_AC-specific/ac_pcba.encoder.hdf5",
test_train_opts
=
options
.
Options
(
inputFile
=
f
'
{
project_directory
}
/input_datasets/S_dataset.pkl'
,
outputDir
=
f
'
{
project_directory
}
/output_data/console_test'
,
ecWeightsFile
=
f
'
{
project_directory
}
/output_data/case_00/AE_S/ae_S.encoder.hdf5'
,
ecModelDir
=
f
'
{
project_directory
}
/output_data/case_00/AE_S/saved_model'
,
type
=
'smiles'
,
fpType
=
'topological'
,
epochs
=
20
,
epochs
=
100
,
batchSize
=
1024
,
fpSize
=
2048
,
encFPSize
=
256
,
enableMultiLabel
=
False
,
test
ingFraction
=
0.2
,
kFolds
=
5
,
test
Size
=
0.2
,
kFolds
=
2
,
verbose
=
2
,
trainAC
=
Tru
e
,
trainAC
=
Fals
e
,
trainFNN
=
True
,
compressFeatures
=
True
compressFeatures
=
True
,
activationFunction
=
"selu"
,
lossFunction
=
'bce'
,
optimizer
=
'Adam'
,
fnnType
=
'FNN'
)
logging
.
basicConfig
(
level
=
logging
.
INFO
)
test_predict_args
=
options
.
PredictOptions
(
inputFile
=
f
"
{
project_directory
}
/data/Sun_etal_dataset.cids.predictionSet.csv"
,
outputDir
=
f
"
{
project_directory
}
/validation/case_01/results/"
,
ecWeightsFile
=
f
"/home/hertelj/git-hertelj/deepFPlearn_CODE/validation/case_00/results_AC_S/ac_S.encoder.hdf5"
,
model
=
f
"
{
project_directory
}
/validation/case_01/results/AR_compressed-True.full.FNN-.model.hdf5"
,
target
=
"AR"
,
fpSize
=
2048
,
test_pred_opts
=
options
.
Options
(
inputFile
=
f
"
{
project_directory
}
/input_datasets/S_dataset.pkl"
,
outputDir
=
f
"
{
project_directory
}
/output_data/console_test"
,
outputFile
=
f
"
{
project_directory
}
/output_data/console_test/S_dataset.predictions_ER.csv"
,
ecModelDir
=
f
"
{
project_directory
}
/output_data/case_00/AE_S/saved_model"
,
fnnModelDir
=
f
"
{
project_directory
}
/output_data/console_test/ER_saved_model"
,
type
=
"smiles"
,
fpType
=
"topological"
)
def
train
(
opts
:
options
.
Train
Options
):
def
train
(
opts
:
options
.
Options
):
"""
Run the main training procedure
:param opts: Options defining the details of the training
"""
if
opts
.
wabTracking
:
wandb
.
init
(
project
=
f
"dfpl-training-
{
opts
.
wabTarget
}
"
,
config
=
vars
(
opts
))
# opts = wandb.config
df
=
fp
.
importDataFile
(
opts
.
inputFile
,
import_function
=
fp
.
importSmilesCSV
,
fp_size
=
opts
.
fpSize
)
# Create output dir if it doesn't exist
createDirectory
(
opts
.
outputDir
)
createDirectory
(
opts
.
outputDir
)
# why? we just created that directory in the function before??
encoder
=
None
if
opts
.
trainAC
:
...
...
@@ -82,22 +75,22 @@ def train(opts: options.TrainOptions):
if
not
opts
.
trainAC
:
# load trained model for autoencoder
(
_
,
encoder
)
=
ac
.
define_ac_model
(
input_size
=
opts
.
fpSize
,
encoding_dim
=
opts
.
encFPSize
)
encoder
.
load_weights
(
makePathAbsolute
(
opts
.
ecWeightsFile
))
encoder
=
keras
.
models
.
load_model
(
opts
.
ecModelDir
)
# compress the fingerprints using the autoencoder
df
=
ac
.
compress_fingerprints
(
df
,
encoder
)
if
opts
.
trainFNN
:
# train single label models
fNN
.
train_nn_models
(
df
=
df
,
opts
=
opts
)
# fNN.train_single_label_models(df=df, opts=opts)
sl
.
train_single_label_models
(
df
=
df
,
opts
=
opts
)
# train multi-label models
if
opts
.
enableMultiLabel
:
fNN
.
train_nn_models_multi
(
df
=
df
,
opts
=
opts
)
def
predict
(
opts
:
options
.
Predict
Options
)
->
None
:
def
predict
(
opts
:
options
.
Options
)
->
None
:
"""
Run prediction given specific options
:param opts: Options defining the details of the prediction
...
...
@@ -108,26 +101,20 @@ def predict(opts: options.PredictOptions) -> None:
# Create output dir if it doesn't exist
createDirectory
(
opts
.
outputDir
)
use_compressed
=
False
if
opts
.
ecWeightsFile
:
logging
.
info
(
f
"Using fingerprint compression with AC
{
opts
.
ecWeightsFile
}
"
)
use_compressed
=
True
if
opts
.
compressFeatures
:
# load trained model for autoencoder
(
_
,
encoder
)
=
ac
.
define_ac_model
(
input_size
=
opts
.
fpSize
,
encoding_dim
=
opts
.
encFPSize
)
encoder
.
load_weights
(
opts
.
ecWeightsFile
)
encoder
=
keras
.
models
.
load_model
(
opts
.
ecModelDir
)
# compress the fingerprints using the autoencoder
df
=
ac
.
compress_fingerprints
(
df
,
encoder
)
# predict
df2
=
predictions
.
predict_values
(
df
=
df
,
opts
=
opts
,
use_compressed
=
use_compressed
)
opts
=
opts
)
names_columns
=
[
c
for
c
in
df2
.
columns
if
c
not
in
[
'fp'
,
'fpcompressed'
]]
output_file
=
path
.
join
(
opts
.
outputDir
,
path
.
basename
(
path
.
splitext
(
opts
.
inputFile
)[
0
])
+
".predictions.csv"
)
df2
[
names_columns
].
to_csv
(
path_or_buf
=
output_file
)
df2
[
names_columns
].
to_csv
(
path_or_buf
=
path
.
join
(
opts
.
outputDir
,
opts
.
outputFile
))
logging
.
info
(
f
"Prediction successful. Results written to '
{
path
.
join
(
opts
.
outputDir
,
opts
.
outputFile
)
}
'"
)
def
createLogger
(
filename
:
str
)
->
None
:
...
...
@@ -170,7 +157,7 @@ def main():
else
:
raise
ValueError
(
"Input directory is not a directory"
)
if
prog_args
.
method
==
"train"
:
train_opts
=
options
.
Train
Options
.
fromCmdArgs
(
prog_args
)
train_opts
=
options
.
Options
.
fromCmdArgs
(
prog_args
)
fixed_opts
=
dataclasses
.
replace
(
train_opts
,
inputFile
=
makePathAbsolute
(
train_opts
.
inputFile
),
...
...
@@ -182,11 +169,16 @@ def main():
train
(
fixed_opts
)
exit
(
0
)
elif
prog_args
.
method
==
"predict"
:
predict_opts
=
options
.
Predict
Options
.
fromCmdArgs
(
prog_args
)
predict_opts
=
options
.
Options
.
fromCmdArgs
(
prog_args
)
fixed_opts
=
dataclasses
.
replace
(
predict_opts
,
inputFile
=
makePathAbsolute
(
predict_opts
.
inputFile
),
outputDir
=
makePathAbsolute
(
predict_opts
.
outputDir
)
outputDir
=
makePathAbsolute
(
predict_opts
.
outputDir
),
outputFile
=
makePathAbsolute
(
path
.
join
(
predict_opts
.
outputDir
,
predict_opts
.
outputFile
)),
ecModelDir
=
makePathAbsolute
(
predict_opts
.
ecModelDir
),
fnnModelDir
=
makePathAbsolute
(
predict_opts
.
fnnModelDir
),
trainAC
=
False
,
trainFNN
=
False
)
createDirectory
(
fixed_opts
.
outputDir
)
createLogger
(
path
.
join
(
fixed_opts
.
outputDir
,
"predict.log"
))
...
...
dfpl/autoencoder.py
View file @
5263dbae
import
os.path
from
os.path
import
basename
import
math
import
numpy
as
np
import
pandas
as
pd
import
logging
from
tensorflow.keras.
callbacks
import
ModelCheckpoint
,
EarlyStopping
import
tensorflow.keras.
metrics
as
metrics
from
tensorflow.keras.models
import
Model
from
tensorflow.keras.layers
import
Input
,
Dense
from
tensorflow.keras
import
optimizers
from
tensorflow.keras
import
optimizers
,
losses
,
initializers
from
sklearn.model_selection
import
train_test_split
from
dfpl
import
options
from
dfpl
import
callbacks
from
dfpl
import
history
as
ht
from
dfpl
import
settings
def
define_ac_model
(
input_size
:
int
=
2048
,
encoding_dim
:
int
=
256
,
my_loss
:
str
=
"binary_crossentropy"
,
my_lr
:
float
=
0.001
,
my_decay
:
float
=
0.01
)
->
(
Model
,
Model
):
def
define_ac_model
(
opts
:
options
.
Options
,
output_bias
=
None
)
->
(
Model
,
Model
):
"""
This function provides an autoencoder model to reduce a certain input to a compressed version.
:param encoding_dim: Size of the compressed representation. Default: 85
:param input_size: Size of the input. Default: 2048
:param my_loss: Loss function, see Keras Loss functions for potential values. Default: binary_crossentropy
:param my_lr:
:param my_decay:
:param opts: Training options that provide values for adjusting the neural net
:param output_bias: Bias used to initialize the last layer. It gives the net a head start in training on
imbalanced data (which the fingerprints are, because they have many more 0's than 1's in them).
:return: a tuple of autoencoder and encoder models
"""
input_size
=
opts
.
fpSize
encoding_dim
=
opts
.
encFPSize
ac_optimizer
=
optimizers
.
Adam
(
learning_rate
=
opts
.
aeLearningRate
,
decay
=
opts
.
aeLearningRateDecay
)
ac_optimizer
=
optimizers
.
Adam
(
learning_rate
=
my_lr
,
decay
=
my_decay
)
if
output_bias
is
not
None
:
output_bias
=
initializers
.
Constant
(
output_bias
)
# get the number of meaningful hidden layers (latent space included)
hidden_layer_count
=
round
(
math
.
log2
(
input_size
/
encoding_dim
))
...
...
@@ -44,81 +43,71 @@ def define_ac_model(
input_vec
=
Input
(
shape
=
(
input_size
,))
# 1st hidden layer, that receives weights from input layer
# equals bottle neck layer, if hidden_layer_count==1!
encoded
=
Dense
(
units
=
int
(
input_size
/
2
),
activation
=
'relu'
)(
input_vec
)
# equals bottleneck layer, if hidden_layer_count==1!
if
opts
.
aeActivationFunction
!=
"selu"
:
encoded
=
Dense
(
units
=
int
(
input_size
/
2
),
activation
=
opts
.
aeActivationFunction
)(
input_vec
)
else
:
encoded
=
Dense
(
units
=
int
(
input_size
/
2
),
activation
=
opts
.
aeActivationFunction
,
kernel_initializer
=
"lecun_normal"
)(
input_vec
)
if
hidden_layer_count
>
1
:
# encoding layers, incl. bottle
neck
# encoding layers, incl. bottle
-
neck
for
i
in
range
(
1
,
hidden_layer_count
):
factor_units
=
2
**
(
i
+
1
)
# print(f'{factor_units}: {int(input_size / factor_units)}')
encoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
'relu'
)(
encoded
)
if
opts
.
aeActivationFunction
!=
"selu"
:
encoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
opts
.
aeActivationFunction
)(
encoded
)
else
:
encoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
opts
.
aeActivationFunction
,
kernel_initializer
=
"lecun_normal"
)(
encoded
)
# 1st decoding layer
factor_units
=
2
**
(
hidden_layer_count
-
1
)
decoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
'relu'
)(
encoded
)
if
opts
.
aeActivationFunction
!=
"selu"
:
decoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
opts
.
aeActivationFunction
)(
encoded
)
else
:
decoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
opts
.
aeActivationFunction
,
kernel_initializer
=
"lecun_normal"
)(
encoded
)
# decoding layers
for
i
in
range
(
hidden_layer_count
-
2
,
0
,
-
1
):
factor_units
=
2
**
i
# print(f'{factor_units}: {int(input_size/factor_units)}')
decoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
'relu'
)(
decoded
)
if
opts
.
aeActivationFunction
!=
"selu"
:
decoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
opts
.
aeActivationFunction
)(
decoded
)
else
:
decoded
=
Dense
(
units
=
int
(
input_size
/
factor_units
),
activation
=
opts
.
aeActivationFunction
,
kernel_initializer
=
"lecun_normal"
)(
decoded
)
# output layer
# The output layer needs to predict the probability of an output which needs
# to either 0 or 1 and hence we use sigmoid activation function.
decoded
=
Dense
(
units
=
input_size
,
activation
=
'sigmoid'
)(
decoded
)
decoded
=
Dense
(
units
=
input_size
,
activation
=
'sigmoid'
,
bias_initializer
=
output_bias
)(
decoded
)
else
:
# output layer
decoded
=
Dense
(
units
=
input_size
,
activation
=
'sigmoid'
)(
encoded
)
decoded
=
Dense
(
units
=
input_size
,
activation
=
'sigmoid'
,
bias_initializer
=
output_bias
)(
encoded
)
autoencoder
=
Model
(
input_vec
,
decoded
)
encoder
=
Model
(
input_vec
,
encoded
)
autoencoder
.
summary
(
print_fn
=
logging
.
info
)
encoder
.
summary
(
print_fn
=
logging
.
info
)
# We compile the autoencoder model with adam optimizer.
# As fingerprint positions have a value of 0 or 1 we use binary_crossentropy as the loss function
autoencoder
.
compile
(
optimizer
=
ac_optimizer
,
loss
=
my_loss
)
loss
=
losses
.
BinaryCrossentropy
(),
metrics
=
[
metrics
.
AUC
(),
metrics
.
Precision
(),
metrics
.
Recall
()
]
)
return
autoencoder
,
encoder
def
autoencoder_callback
(
checkpoint_path
:
str
)
->
list
:
"""
Callbacks for fitting the autoencoder
:param checkpoint_path: The output directory to store the checkpoint weight files
:return: List of ModelCheckpoint and EarlyStopping class.
"""
# enable this checkpoint to restore the weights of the best performing model
checkpoint
=
ModelCheckpoint
(
checkpoint_path
,
verbose
=
1
,
period
=
settings
.
ac_train_check_period
,
save_best_only
=
True
,
mode
=
'min'
,
save_weights_only
=
True
)
# enable early stopping if val_loss is not improving anymore
early_stop
=
EarlyStopping
(
patience
=
settings
.
ac_train_patience
,
min_delta
=
settings
.
ac_train_min_delta
,
verbose
=
1
,
restore_best_weights
=
True
)
return
[
checkpoint
,
early_stop
]
def
train_full_ac
(
df
:
pd
.
DataFrame
,
opts
:
options
.
TrainOptions
)
->
Model
:
def
train_full_ac
(
df
:
pd
.
DataFrame
,
opts
:
options
.
Options
)
->
Model
:
"""
Train an autoencoder on the given feature matrix X. Response matrix is only used to
split meaningfully in test and train data set.
...
...
@@ -128,25 +117,21 @@ def train_full_ac(df: pd.DataFrame, opts: options.TrainOptions) -> Model:
:return: The encoder model of the trained autoencoder
"""
# Set up the model of the AC w.r.t. the input size and the dimension of the bottle neck (z!)
(
autoencoder
,
encoder
)
=
define_ac_model
(
input_size
=
opts
.
fpSize
,
encoding_dim
=
opts
.
encFPSize
)
# define output file for autoencoder and encoder weights
if
opts
.
ecWeightsFile
==
""
:
logging
.
info
(
"No A
C
encoder weights file specified"
)
logging
.
info
(
"No A
E
encoder weights file specified"
)
base_file_name
=
os
.
path
.
splitext
(
basename
(
opts
.
inputFile
))[
0
]
logging
.
info
(
f
"(auto)encoder weights will be saved in
{
base_file_name
}
.[auto]encoder.hdf5"
)
ac_weights_file
=
os
.
path
.
join
(
opts
.
outputDir
,
base_file_name
+
".autoencoder.hdf5"
)
ec_weights_file
=
os
.
path
.
join
(
opts
.
outputDir
,
base_file_name
+
".encoder.hdf5"
)
ac_weights_file
=
os
.
path
.
join
(
opts
.
outputDir
,
base_file_name
+
".autoencoder.
weights.
hdf5"
)
ec_weights_file
=
os
.
path
.
join
(
opts
.
outputDir
,
base_file_name
+
".encoder.
weights.
hdf5"
)
else
:
logging
.
info
(
f
"A
C
encoder will be saved in
{
opts
.
ecWeightsFile
}
"
)
logging
.
info
(
f
"A
E
encoder will be saved in
{
opts
.
ecWeightsFile
}
"
)
base_file_name
=
os
.
path
.
splitext
(
basename
(
opts
.
ecWeightsFile
))[
0
]
ac_weights_file
=
os
.
path
.
join
(
opts
.
outputDir
,
base_file_name
+
".autoencoder.hdf5"
)
ac_weights_file
=
os
.
path
.
join
(
opts
.
outputDir
,
base_file_name
+
".autoencoder.
weights.
hdf5"
)
ec_weights_file
=
os
.
path
.
join
(
opts
.
outputDir
,
opts
.
ecWeightsFile
)
# collect the callbacks for training
callback_list
=
autoencoder_callback
(
checkpoint_path
=
ac_weights_file
)
callback_list
=
callbacks
.
autoencoder_callback
(
checkpoint_path
=
ac_weights_file
,
opts
=
opts
)
# Select all fps that are valid and turn them into a numpy array
# This step is crucial for speed!!!
...
...
@@ -155,27 +140,61 @@ def train_full_ac(df: pd.DataFrame, opts: options.TrainOptions) -> Model:
copy
=
settings
.
numpy_copy_values
)
logging
.
info
(
f
"Training AC on a matrix of shape
{
fp_matrix
.
shape
}
with type
{
fp_matrix
.
dtype
}
"
)
# split data into test and training data
x_train
,
x_test
=
train_test_split
(
fp_matrix
,
test_size
=
0.2
,
random_state
=
42
)
logging
.
info
(
f
"AC train data shape
{
x_train
.
shape
}
with type
{
x_train
.
dtype
}
"
)
logging
.
info
(
f
"AC test data shape
{
x_test
.
shape
}
with type
{
x_test
.
dtype
}
"
)
# When training the final AE, we don't want any test data. We want to train it on the all available
# fingerprints.
assert
(
0.0
<=
opts
.
testSize
<=
0.5
)
if
opts
.
testSize
>
0.0
:
# split data into test and training data
if
opts
.
wabTracking
:
x_train
,
x_test
=
train_test_split
(
fp_matrix
,
test_size
=
opts
.
testSize
,
random_state
=
42
)
else
:
x_train
,
x_test
=
train_test_split
(
fp_matrix
,
test_size
=
opts
.
testSize
)
else
:
x_train
=
fp_matrix
x_test
=
None
# Calculate the initial bias aka the log ratio between 1's and 0'1 in all fingerprints
ids
,
counts
=
np
.
unique
(
x_train
.
flatten
(),
return_counts
=
True
)
count_dict
=
dict
(
zip
(
ids
,
counts
))
if
count_dict
[
0
]
==
0
:
initial_bias
=
None
logging
.
info
(
"No zeroes in training labels. Setting initial_bias to None."
)
else
:
initial_bias
=
np
.
log
([
count_dict
[
1
]
/
count_dict
[
0
]])
logging
.
info
(
f
"Initial bias for last sigmoid layer:
{
initial_bias
[
0
]
}
"
)
if
opts
.
testSize
>
0.0
:
logging
.
info
(
f
"AE training/testing mode with train- and test-samples"
)
logging
.
info
(
f
"AC train data shape
{
x_train
.
shape
}
with type
{
x_train
.
dtype
}
"
)
logging
.
info
(
f
"AC test data shape
{
x_test
.
shape
}
with type
{
x_test
.
dtype
}
"
)
else
:
logging
.
info
(
f
"AE full train mode without test-samples"
)
logging
.
info
(
f
"AC train data shape
{
x_train
.
shape
}
with type
{
x_train
.
dtype
}
"
)
# Set up the model of the AC w.r.t. the input size and the dimension of the bottle neck (z!)
(
autoencoder
,
encoder
)
=
define_ac_model
(
opts
,
output_bias
=
initial_bias
)
auto_hist
=
autoencoder
.
fit
(
x_train
,
x_train
,
callbacks
=
callback_list
,
epochs
=
opts
.
e
pochs
,
batch_size
=
256
,
epochs
=
opts
.
aeE
pochs
,
batch_size
=
opts
.
aeBatchSize
,
verbose
=
opts
.
verbose
,
validation_data
=
(
x_test
,
x_test
))
validation_data
=
(
x_test
,
x_test
)
if
opts
.
testSize
>
0.0
else
None
)
logging
.
info
(
f
"Autoencoder weights stored in file:
{
ac_weights_file
}
"
)
ht
.
store_and_plot_history
(
base_file_name
=
os
.
path
.
join
(
opts
.
outputDir
,
base_file_name
+
".AC"
),
hist
=
auto_hist
)
encoder
.
save_weights
(
ec_weights_file
)
logging
.
info
(
f
"Encoder weights stored in file:
{
ec_weights_file
}
"
)
# encoder.save_weights(ec_weights_file) # these are the wrong weights! we need those from the callback model
# logging.info(f"Encoder weights stored in file: {ec_weights_file}")
# save AE callback model
if
opts
.
testSize
>
0.0
:
(
callback_autoencoder
,
callback_encoder
)
=
define_ac_model
(
opts
)
callback_autoencoder
.
load_weights
(
filepath
=
ac_weights_file
)
callback_encoder
.
save
(
filepath
=
opts
.
ecModelDir
)
else
:
encoder
.
save
(
filepath
=
opts
.
ecModelDir
)
return
encoder
...
...
dfpl/callbacks.py
0 → 100644
View file @
5263dbae
# for NN model functions
from
tensorflow.keras.callbacks
import
ModelCheckpoint
,
EarlyStopping
# for testing in Weights & Biases
from
wandb.keras
import
WandbCallback
from
dfpl
import
options
from
dfpl
import
settings
def
autoencoder_callback
(
checkpoint_path
:
str
,
opts
:
options
.
Options
)
->
list
:
"""
Callbacks for fitting the autoencoder
:param checkpoint_path: The output directory to store the checkpoint weight files
:param opts: Training options provided to the run
:return: List of ModelCheckpoint and EarlyStopping class.
"""
callbacks
=
[]
if
opts
.
testSize
>
0.0
:
target
=
"val_loss"
else
:
target
=
"loss"
# enable this checkpoint to restore the weights of the best performing model
checkpoint
=
ModelCheckpoint
(
checkpoint_path
,
monitor
=
target
,
mode
=
'min'
,
verbose
=
1
,
period
=
settings
.
ac_train_check_period
,
save_best_only
=
True
,
save_weights_only
=
True
)
callbacks
.
append
(
checkpoint
)
# enable early stopping if val_loss is not improving anymore
early_stop
=
EarlyStopping
(
monitor
=
target
,
mode
=
'min'
,
patience
=
settings
.
ac_train_patience
,
min_delta
=
settings
.
ac_train_min_delta
,
verbose
=
1
,
restore_best_weights
=
True
)
callbacks
.
append
(
early_stop
)
if
opts
.
wabTracking
:
callbacks
.
append
(
WandbCallback
(
save_model
=
False
))
return
callbacks
def
nn_callback
(
checkpoint_path
:
str
,
opts
:
options
.
Options
)
->
list
:
"""
Callbacks for fitting the feed forward network (FNN)
:param checkpoint_path: The output directory to store the checkpoint weight files
:param opts: Training options provided to the run
:return: List of ModelCheckpoint and EarlyStopping class.
"""
callbacks
=
[]
if
opts
.
testSize
>
0.0
:
# enable this checkpoint to restore the weights of the best performing model
checkpoint
=
ModelCheckpoint
(
checkpoint_path
,
verbose
=
1
,
period
=
settings
.
nn_train_check_period
,
save_best_only
=
True
,
monitor
=
"val_loss"
,
mode
=
'min'
,
save_weights_only
=
True
)
callbacks
.
append
(
checkpoint
)
# enable early stopping if val_loss is not improving anymore
early_stop
=
EarlyStopping
(
patience
=
settings
.
nn_train_patience
,
monitor
=
"val_loss"
,
mode
=
"min"
,
min_delta
=
settings
.
nn_train_min_delta
,
verbose
=
1
,
restore_best_weights
=
True
)
callbacks
.
append
(
early_stop
)
if
opts
.
wabTracking
:
callbacks
.
append
(
WandbCallback
(
save_model
=
False
))
return
callbacks
\ No newline at end of file
dfpl/deepFPlearn-HyperParameterTuning.py
View file @
5263dbae
...
...
@@ -12,10 +12,17 @@ from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
from
time
import
time
# --------------------------------------------------------------------------- #
# model for tuning optmizer, activation functions and initialization of hidden layers
def
tuning_model
(
optimizer
,
activation
,
init
,
dropout
=
0.2
):
"""
model for tuning optimizer, activation functions and initialization of hidden layers
:param optimizer:
:param activation:
:param init:
:param dropout:
:return:
"""
model
=
Sequential
()
model
.
add
(
Dense
(
1024
,
activation
=
activation
,
init
=
init
))
model
.
add
(
Dropout
(
dropout
))
...
...
dfpl/dfplmodule.py
View file @
5263dbae
import
argparse
# Python module for deepFPlearn tools
import
re
import
math
import
csv
import
numpy
as
np
import
pandas
as
pd
import
shutil
import
matplotlib.pyplot
as
plt
import
matplotlib
# matplotlib.use('Agg')
import
matplotlib.patches
as
mpatches
from
matplotlib.colors
import
LinearSegmentedColormap
# %matplotlib inline
# for drawing the heatmaps
import
seaborn
as
sns
...
...
@@ -28,8 +33,15 @@ from tensorflow.keras.optimizers import SGD
from
tensorflow.keras.callbacks
import
ModelCheckpoint
from
tensorflow.keras.callbacks
import
EarlyStopping
from
tensorflow.keras.callbacks
import
ReduceLROnPlateau
import
sklearn
from
sklearn.model_selection
import
train_test_split
from
sklearn.metrics
import
roc_curve
from
sklearn.metrics
import
confusion_matrix
from
sklearn.metrics
import
auc
from
sklearn.metrics
import
matthews_corrcoef
from
sklearn.model_selection
import
KFold
from
time
import
time