Commit ee5bc57c authored by Francisco Arcila Salamanca's avatar Francisco Arcila Salamanca
Browse files

update import functions for the generic receptors db, remoce hdf5

parent ab8eb9c7
......@@ -25,7 +25,6 @@ import pandas as pd
import json
import argparse
import airr
import h5py
from argparse import RawTextHelpFormatter
# custom imports
......@@ -60,9 +59,13 @@ parser.add_argument(
# get variables
args = parser.parse_args()
project_name = args.project_name
config_db = args.mariadb_group
files_path = args.input_path
TEST_ARGS = {
"project_name": args.project_name,
"config_db": args.mariadb_group,
"ssh_group": args.ssh_group,
"input_path": args.input_path,
}
def main():
......@@ -71,6 +74,11 @@ def main():
main function of the importer
:return:
"""
config_db = TEST_ARGS["config_db"]
project_name = TEST_ARGS["project_name"]
files_path = TEST_ARGS["input_path"]
# make a dictionary with file_names:paths
files_dictionary = {
file.split("_")[0]: os.path.join(files_path, file)
......@@ -80,16 +88,18 @@ def main():
map_json = sr.airr_scireptor_json
mysql_file = sr.mysql_file
if args.ssh_group:
if TEST_ARGS["ssh_group"] and "test" not in TEST_ARGS["ssh_group"]:
try:
# first toplevel connection for shh tunneling
server, db = sr.connect(conf_db=config_db, ssh_group=args.ssh_group)
server, db = sr.connect(conf_db=config_db, ssh_group=TEST_ARGS["ssh_group"])
safe_create_schema(db, project_name)
server.stop()
# main connection
server, db = sr.connect(
conf_db=config_db, database=project_name, ssh_group=args.ssh_group
conf_db=config_db,
database=project_name,
ssh_group=TEST_ARGS["ssh_group"],
)
except Exception as error:
......@@ -147,15 +157,16 @@ def main():
# todo don't use pandas to open a hd5 file due to pickling security warning
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_hdf.html
f = h5py.File(files_dictionary["expression"], "r")
expression_hdf = pd.DataFrame(f["flowdata"][:])
import_repertoires(repertoires_yaml, map_json, project_name, db)
expression_hdf = pd.read_csv(files_dictionary["expression"], sep="\t")
receptor_type = import_repertoires(repertoires_yaml, map_json, project_name, db)
import_rearrangements(rearrangements, map_json, project_name, db)
import_receptors(receptors_json, db)
import_receptors(receptors_json, db, receptor_type)
import_expression(expression_hdf, project_name, db)
db.close()
if args.ssh_group:
if TEST_ARGS["ssh_group"] and "test" not in TEST_ARGS["ssh_group"]:
server.stop()
......
......@@ -29,38 +29,34 @@ def safe_create_schema(db: MySQLcon, project_name: str) -> None:
db.close()
# 1 receptors import function
def import_receptors(receptors_json: dict, db: MySQLcon) -> None:
# 1 receptors import function # todo adapt for TCR
def import_receptors(
receptors_json: dict, db: MySQLcon, receptor_type: str, mode=""
) -> None:
"""
:param mode: test
:param receptor_type: type of receptor TCR|BCR
:param receptors_json: receptors dictionary with format {id:{"H_seq":"IGH","L_seq":"IGL|IGK"}}
:param db: database connection object
:return: None, inserts new observed receptors into the `scireptor.receptors` database
"""
if "test" in mode:
receptors_db = "test_receptors"
else:
receptors_db = "scireptor_receptors"
# normalize receptors dictionary
receptors_dic = {}
for x, v in receptors_json.items():
receptors_dic[x] = {locus: seq for seq, locus in v.items()}
# receptors DF
receptors = pd.DataFrame.from_dict(receptors_dic, orient="index")
light_seqs = (
receptors[["IGL", "IGK"]]
.stack()
.rename_axis(["receptor_id", "IG_light_locus"])
.rename("IG_light")
.reset_index(level=["receptor_id", "IG_light_locus"])
)
receptors = (
receptors[["IGH"]]
.rename_axis(["receptor_id"])
.reset_index(level=["receptor_id"])
.merge(light_seqs, on="receptor_id")
)
logger.info("Receptors processing started")
c = db.cursor()
c.execute(
"SELECT receptor_id FROM `scireptor_receptors`.`IG_database` ORDER BY receptor_id DESC LIMIT 1;"
f"SELECT receptor_id FROM `{receptors_db}`.`receptors` ORDER BY receptor_id DESC LIMIT 1;"
)
last_id = c.fetchone() # get the last inserted id on the db
......@@ -70,24 +66,27 @@ def import_receptors(receptors_json: dict, db: MySQLcon) -> None:
receptor_id_count = itertools.count(int(last_id[0]) + 1)
print_info_dic = {"found_receptors": [], "new_receptors": []}
for i, row in receptors.iterrows():
found_receptor = query_receptor(
c, (row["IGH"], row["IG_light"], row["IG_light_locus"])
)
if found_receptor:
print_info_dic["found_receptors"].append(found_receptor[0])
else:
fix_receptor_id = next(receptor_id_count)
values_insert = (
fix_receptor_id,
row["IGH"],
row["IG_light"],
row["IG_light_locus"],
)
sql_insert = """INSERT INTO `scireptor_receptors`.`IG_database`
(`receptor_id`, `IG_Heavy`, `IG_light`, `IG_light_locus`) VALUES (%s, %s,%s,%s);"""
c.execute(sql_insert, values_insert)
print_info_dic["new_receptors"].append(fix_receptor_id)
for receptor in receptors_dic:
# look up for both positions in the receptors table to avoid inserting duplicates.
this_list = [(receptors_dic[receptor][x], x) for x in receptors_dic[receptor]]
combos = [(this_list[1] + this_list[0]), (this_list[0] + this_list[1])]
for x in combos:
found_receptor = query_receptor(c, x, mode)
if found_receptor:
print_info_dic["found_receptors"].append(found_receptor[0])
else:
fix_receptor_id = next(receptor_id_count)
values_insert = (fix_receptor_id,) + x
sql_insert = f"""INSERT INTO `{receptors_db}`.`receptors` (
`receptor_id`, `chain_1`, `chain_2`, `locus_1`, `locus_2`) VALUES (%s,%s,%s,%s,%s);"""
c.execute(sql_insert, values_insert)
print_info_dic["new_receptors"].append(fix_receptor_id)
# print info of inserted receptors
logger.info(
......@@ -410,7 +409,7 @@ def sample_donor_insert(
def import_repertoires(
repertoires_yaml: dict, map_json: dict, project_name: str, db: MySQLcon
) -> None:
) -> str:
"""
import repertoire yaml file into the scireptor database
:param repertoires_yaml: repertoire YAML file
......@@ -445,6 +444,12 @@ def import_repertoires(
prop[k]["study_title"] = f"{project_name}"
if "keywords_study" in prop[k]:
prop[k]["keywords_study"] = ",".join(prop[k]["keywords_study"])
if "contains_ig" in prop[k]["keywords_study"]:
receptor_type = "IG"
elif "contains_tcr" in prop[k]["keywords_study"]:
receptor_type = "TCR"
else:
receptor_type = None
to_insert = {x: prop[k][x] for x in prop[k] if x in study_cols}
missing_from_sr = [x for x in prop[k] if x not in study_cols]
......@@ -549,6 +554,8 @@ def import_repertoires(
db.commit()
return receptor_type
def import_expression(flow_hdf: pd.DataFrame, project_name: str, db: MySQLcon) -> None:
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment