Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Scilifelab_epps Version Log

## 20251023.1

Update ONT EPPs to use cloudant instead of couchdb

## 20251021.1

Update EPP for parsing Run information to LIMS by correcting the path to the metadata on Preproc.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
CouchDB
flowcell_parser @ git+https://github.com/SciLifeLab/flowcell_parser
genologics
google_api_python_client
httplib2
interop
ibmcloudant
jwcrypto
Markdown
numpy
Expand Down
38 changes: 23 additions & 15 deletions scripts/ont_send_reloading_info_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
from argparse import ArgumentParser
from datetime import datetime as dt

import couchdb
import yaml
from couchdb.client import Database, Document, Row, ViewResults
from genologics.config import BASEURI, PASSWORD, USERNAME
from genologics.entities import Artifact, Process
from genologics.lims import Lims
from ibmcloudant import CouchDbSessionAuthenticator, cloudant_v1

from scilifelab_epps.wrapper import epp_decorator

Expand All @@ -38,15 +37,21 @@ def send_reloading_info_to_db(process: Process):
if run:
runs.append(run)

db: Database = get_ONT_db()
view: ViewResults = db.view("info/all_stats")
client, db_name = get_ONT_db()

view: dict = client.post_view(
db=db_name,
ddoc="info",
view="all_stats",
include_docs=False,
).get_result()

errors = False
for run in runs:
rows_matching_run: list[Row] = [
rows_matching_run: list[dict] = [
row
for row in view.rows
if f"{run['run_name']}" in row.value["TACA_run_path"]
for row in view["rows"]
if f"{run['run_name']}" in row["value"]["TACA_run_path"]
]

try:
Expand All @@ -57,8 +62,8 @@ def send_reloading_info_to_db(process: Process):
f"The database contains multiple documents with run name '{run['run_name']}'. Contact a database administrator."
)

doc_id: str = rows_matching_run[0].id
doc: Document = db[doc_id]
doc_id: str = rows_matching_run[0]["id"]
doc: dict = client.get_document(db=db_name, doc_id=doc_id).get_result()

dict_to_add = {
"step_name": process.type.name,
Expand All @@ -76,7 +81,7 @@ def send_reloading_info_to_db(process: Process):
doc["lims"]["reloading"] = []
doc["lims"]["reloading"].append(dict_to_add)

db[doc.id] = doc
client.put_document(db=db_name, doc_id=doc_id, document=doc).get_result()

logging.info(f"Run '{run['run_name']}' was updated successfully.")

Expand Down Expand Up @@ -157,17 +162,20 @@ def check_times_list(times_list: list[str]):
prev_hours, prev_minutes = hours, minutes


def get_ONT_db() -> Database:
"""Mostly copied from write_notes_to_couchdb.py"""
def get_ONT_db() -> tuple[cloudant_v1.CloudantV1, str]:
configf = "~/.statusdb_cred.yaml"

with open(os.path.expanduser(configf)) as config_file:
config = yaml.safe_load(config_file)

url_string = f"https://{config['statusdb'].get('username')}:{config['statusdb'].get('password')}@{config['statusdb'].get('url')}"
couch = couchdb.Server(url=url_string)
cloudant = cloudant_v1.CloudantV1(
authenticator=CouchDbSessionAuthenticator(
config["statusdb"].get("username"), config["statusdb"].get("password")
)
)
cloudant.set_service_url(config["statusdb"].get("url"))

return couch["nanopore_runs"]
return (cloudant, "nanopore_runs")


def check_csv_udf_list(pattern: str, csv_udf_list: list[str]) -> bool:
Expand Down
15 changes: 10 additions & 5 deletions scripts/ont_suggest_ports.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ def main(lims, args):
outputs = [op for op in currentStep.all_outputs() if op.type == "Analyte"]

# Get database
db = get_ONT_db()
view = db.view("info/all_stats")
client, db_name = get_ONT_db()

# Instantiate dict for counting port usage
ports = {}
Expand All @@ -34,10 +33,16 @@ def main(lims, args):
r"/\d{8}_\d{4}_([1-8][A-H])_"
) # Matches start of run name, capturing position as a group

view = client.post_view(
db=db_name,
ddoc="info",
view="all_stats",
include_docs=False,
).get_result()
# Count port usage
for row in view.rows:
if re.search(pattern, row.value["TACA_run_path"]):
position = re.search(pattern, row.value["TACA_run_path"]).groups()[0]
for row in view["rows"]:
if re.search(pattern, row["value"]["TACA_run_path"]):
position = re.search(pattern, row["value"]["TACA_run_path"]).groups()[0]
ports[position] += 1

# Sort ports (a sort of port sort, if you will)
Expand Down
64 changes: 45 additions & 19 deletions scripts/ont_sync_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from argparse import ArgumentParser, Namespace
from datetime import datetime as dt

from couchdb.client import Database, Document, Row, ViewResults
from generate_minknow_samplesheet import (
generate_MinKNOW_samplesheet,
get_ont_library_contents,
Expand All @@ -15,6 +14,7 @@
from genologics.config import BASEURI, PASSWORD, USERNAME
from genologics.entities import Artifact, Process
from genologics.lims import Lims
from ibmcloudant import cloudant_v1
from ont_send_reloading_info_to_db import get_ONT_db

from scilifelab_epps.utils import udf_tools
Expand Down Expand Up @@ -89,15 +89,17 @@ def udfs_matches_run_name(art: Artifact) -> bool:
def get_matching_db_rows(
art: Artifact,
process: Process,
view: ViewResults,
view: tuple[str, str, str],
run_name: str | None,
) -> list[Row]:
client: cloudant_v1.CloudantV1,
) -> list[dict]:
"""Find the rows in the database view that match the given artifact."""
matching_rows = []

qc = True if "QC" in process.type.name else False
logging.info(f"QC run: {qc}")

db_name, ddoc, view_name = view
# If run name is not supplied, try to find it in the database, assuming it follows the samplesheet naming convention
if run_name is None:
# Define query pattern
Expand All @@ -117,8 +119,15 @@ def get_matching_db_rows(
f"No run name supplied. Quering the database for run with path pattern '{pattern}'."
)

for row in view.rows:
query = row.value["TACA_run_path"]
rows: list[dict] = client.post_view(
db=db_name,
ddoc=ddoc,
view=view_name,
include_docs=False,
).get_result()["rows"]

for row in rows:
query = row["value"]["TACA_run_path"]
if re.match(pattern, query):
matching_rows.append(row)

Expand All @@ -127,17 +136,31 @@ def get_matching_db_rows(
logging.info(
f"Full run name supplied. Quering the database for run '{run_name}'."
)
for row in view.rows:
if run_name == row.key:
matching_rows.append(row)
rows = client.post_view(
db=db_name,
ddoc=ddoc,
view=view_name,
include_docs=False,
key=run_name,
).get_result()["rows"]

if rows:
matching_rows.append(rows[0])

return matching_rows


def write_to_doc(
doc: Document, db: Database, process: Process, art: Artifact, args: Namespace
doc: dict,
db: str,
process: Process,
art: Artifact,
args: Namespace,
client: cloudant_v1.CloudantV1,
):
"""Update a given document with the given artifact's loading information."""
"""
Update a given document with the given artifact's loading information.
"""

library_df = get_ont_library_contents(
ont_library=art,
Expand All @@ -164,7 +187,7 @@ def write_to_doc(
doc["lims"]["loading"] = []
doc["lims"]["loading"].append(dict_to_add)

db[doc.id] = doc
client.put_document(db=db, doc_id=doc["id"], document=doc).get_result()


def sync_runs_to_db(process: Process, args: Namespace, lims: Lims):
Expand All @@ -186,8 +209,9 @@ def sync_runs_to_db(process: Process, args: Namespace, lims: Lims):
# Keep track of which artifacts were successfully updated
arts_successful = []

nanopore_runs_db: Database = get_ONT_db()
view: ViewResults = nanopore_runs_db.view("info/all_stats")
client, nanopore_runs_db = get_ONT_db()

db_view: tuple = (nanopore_runs_db, "info", "all_stats")

for art in arts:
logging.info(f"Processing '{art.name}'...")
Expand All @@ -200,23 +224,25 @@ def sync_runs_to_db(process: Process, args: Namespace, lims: Lims):
continue

# Get matching run docs
matching_rows: list[Row] = get_matching_db_rows(art, process, view, run_name)
matching_rows: list[dict] = get_matching_db_rows(
art, process, db_view, run_name, client
)

if len(matching_rows) == 0:
logging.warning("Run was not found in the database. Skipping.")
continue

elif len(matching_rows) > 1:
matching_run_names = [row.key for row in matching_rows]
matching_run_names = [row["key"] for row in matching_rows]
logging.warning("Query was found in multiple instances in the database: ")
for matching_run_name in matching_run_names:
logging.warning(f"Matching run name: '{matching_run_name}'.")
logging.warning("Contact a database administrator. Skipping.")
continue

doc_run_name: str = matching_rows[0].key
doc_id: str = matching_rows[0].id
doc: Document = nanopore_runs_db[doc_id]
doc_run_name: str = matching_rows[0]["key"]
doc_id: str = matching_rows[0]["id"]
doc: dict = client.get_document(db=nanopore_runs_db, doc_id=doc_id).get_result()

logging.info(f"Found matching run '{doc_run_name}' in the database.")

Expand All @@ -229,7 +255,7 @@ def sync_runs_to_db(process: Process, args: Namespace, lims: Lims):
logging.info(f"Assigning UDF 'ONT run name': '{doc_run_name}'.")
udf_tools.put(art, "ONT run name", doc_run_name)

write_to_doc(doc, nanopore_runs_db, process, art, args)
write_to_doc(doc, nanopore_runs_db, process, art, args, client)
logging.info(f"'{doc_run_name}' was found and updated successfully.")
arts_successful.append(art)

Expand Down