cosmoguard-bd/debug_echa_find.py
2025-12-01 19:04:09 +01:00

676 lines
20 KiB
Python

import marimo
__generated_with = "0.16.5"
app = marimo.App(width="medium")
@app.cell
def _():
import marimo as mo
import urllib.parse
import re as standardre
import json
from bs4 import BeautifulSoup
import requests
return BeautifulSoup, mo, requests, urllib
@app.cell
def _():
from pif_compiler.functions.common_log import get_logger
log = get_logger()
return (log,)
@app.cell
def _(log):
log.info("testing with marimo")
return
@app.cell
def _():
cas_problematici = [
"25525-21-7",
"113170-55-1",
"26172-55-4"
]
return (cas_problematici,)
@app.cell
def _(cas_problematici):
cas_test = cas_problematici[2]
return (cas_test,)
@app.cell
def _(cas_test, urllib):
urllib.parse.quote(cas_test)
return
@app.cell
def _():
BASE_SEARCH = "https://chem.echa.europa.eu/api-substance/v1/substance?pageIndex=1&pageSize=100&searchText="
BASE_DOSSIER = "https://chem.echa.europa.eu/api-dossier-list/v1/dossier?pageIndex=1&pageSize=100&rmlId="
SUBSTANCE_SUMMARY = "https://chem.echa.europa.eu/api-substance/v1/substance/" #+id
CLASSIFICATION_ID = "https://chem.echa.europa.eu/api-cnl-inventory/prominent/overview/classifications/harmonised/459160"
TOXICOLOGICAL_INFO = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-5f55d8ec-7a71-4e2c-9955-8469ead9fe84_0035f3f8-7467-4944-9028-1db2e9c99565.html" # external + rootkey
REPEATED_DOSE = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-82402b09-8d8f-495c-b673-95b205be60e0_0035f3f8-7467-4944-9028-1db2e9c99565.html"
active = "&registrationStatuses=Active"
inactive = "&registrationStatuses=Inactive"
legislation = "&legislation=REACH"
return BASE_DOSSIER, BASE_SEARCH, active, inactive, legislation
@app.cell
def _(BASE_SEARCH, cas_test, requests):
test_search_request = requests.get(BASE_SEARCH + cas_test)
return (test_search_request,)
@app.cell
def _(test_search_request):
response = test_search_request.json()
return (response,)
@app.cell
def _(test_search_request):
test_search_request.json()
return
@app.cell
def _(cas_test, response):
substance = {}
for result in response['items']:
if result["substanceIndex"]["rmlCas"] == cas_test:
substance["rmlCas"] = result["substanceIndex"]["rmlCas"]
substance["rmlId"] = result["substanceIndex"]["rmlId"]
substance["rmlEc"] = result["substanceIndex"]["rmlEc"]
substance["rmlName"] = result["substanceIndex"]["rmlName"]
substance["rmlId"] = result["substanceIndex"]["rmlId"]
return (substance,)
@app.cell
def _(substance):
substance
return
@app.cell
def _(BASE_DOSSIER, active, substance):
url = BASE_DOSSIER + substance['rmlId'] + active
url
return
@app.cell
def _(BASE_DOSSIER, inactive, legislation, requests, substance):
response_dossier = requests.get(BASE_DOSSIER + substance['rmlId'] + inactive + legislation)
return (response_dossier,)
@app.cell
def _(response_dossier):
response_dossier_json = response_dossier.json()
response_dossier_json
return (response_dossier_json,)
@app.cell
def _(response_dossier_json, substance):
substance['lastUpdatedDate'] = response_dossier_json['items'][0]['lastUpdatedDate']
substance['registrationStatus'] = response_dossier_json['items'][0]['registrationStatus']
substance['registrationStatusChangedDate'] = response_dossier_json['items'][0]['registrationStatusChangedDate']
substance['registrationRole'] = response_dossier_json['items'][0]['reachDossierInfo']['registrationRole']
substance['assetExternalId'] = response_dossier_json['items'][0]['assetExternalId']
substance['rootKey'] = response_dossier_json['items'][0]['rootKey']
substance
return
@app.cell
def _():
from pif_compiler.functions.db_utils import get_client
client = get_client()
db = client.get_database(name="toxinfo")
return (db,)
@app.cell
def _(db):
collection = db.get_collection("substance_index")
list = db.list_collection_names()
print(list)
return (collection,)
@app.cell
def _(cas_test, collection, substance):
sub = collection.find_one({"rmlCas": cas_test})
if not sub:
collection.insert_one(substance)
return
@app.cell
def _(substance):
assetExternalId = substance['assetExternalId']
return (assetExternalId,)
@app.cell
def _(assetExternalId):
INDEX_HTML = "https://chem.echa.europa.eu/html-pages/" + assetExternalId + "/index.html"
return
@app.cell
def _(BASE_SEARCH, log, requests):
def search_substance(cas : str) -> dict:
response = requests.get(BASE_SEARCH + cas)
if response.status_code != 200:
log.error(f"Network error: {response.status_code}")
return {}
else:
response = response.json()
if response['state']['totalItems'] == 0:
log.info(f"No substance found for CAS {cas}")
return {}
else:
for result in response['items']:
if result["substanceIndex"]["rmlCas"] == cas:
substance = {
"rmlCas": result["substanceIndex"]["rmlCas"],
"rmlId": result["substanceIndex"]["rmlId"],
"rmlEc": result["substanceIndex"]["rmlEc"],
"rmlName": result["substanceIndex"]["rmlName"],
"rmlId": result["substanceIndex"]["rmlId"]
}
return substance
log.error(f"Something went wrong")
return {}
return (search_substance,)
@app.cell
def _(BASE_DOSSIER, active, inactive, legislation, log, requests):
def get_dossier_info(rmlId: str, type = active) -> dict:
url = BASE_DOSSIER + rmlId + type + legislation
response_dossier = requests.get(url)
if response_dossier.status_code != 200:
log.error(f"Network error: {response_dossier.status_code}")
return {}
response_dossier_json = response_dossier.json()
if response_dossier_json['state']['totalItems'] == 0:
log.info(f"No dossier found for RML ID {rmlId}")
return get_dossier_info(rmlId, inactive)
dossier_info = {
"lastUpdatedDate": response_dossier_json['items'][0]['lastUpdatedDate'],
"registrationStatus": response_dossier_json['items'][0]['registrationStatus'],
"registrationStatusChangedDate": response_dossier_json['items'][0]['registrationStatusChangedDate'],
"registrationRole": response_dossier_json['items'][0]['reachDossierInfo']['registrationRole'],
"assetExternalId": response_dossier_json['items'][0]['assetExternalId'],
"rootKey": response_dossier_json['items'][0]['rootKey']
}
return dossier_info
return (get_dossier_info,)
@app.cell
def _():
return
@app.cell
def _(BeautifulSoup, log, requests):
def get_substance_index(assetExternalId : str) -> dict:
INDEX = "https://chem.echa.europa.eu/html-pages-prod/" + assetExternalId
LINK_DOSSIER = INDEX + "/documents/"
response = requests.get(INDEX + "/index.html")
if response.status_code != 200:
log.error(f"Network error: {response.status_code}")
return {}
soup = BeautifulSoup(response.content, 'html.parser')
index_data = {}
# Toxicological information : txi
txi_div = soup.find('div', id='id_7_Toxicologicalinformation')
txi_link = txi_div.find('a', class_='das-leaf')
txi_href = txi_link['href']
index_data['toxicological_information_link'] = LINK_DOSSIER + txi_href + '.html'
# Repeated dose toxicity : rdt
rdt_div = soup.find('div', id='id_75_Repeateddosetoxicity')
rdt_link = rdt_div.find('a', class_='das-leaf')
rdt_href = rdt_link['href']
index_data['repeated_dose_toxicity_link'] = LINK_DOSSIER + rdt_href + '.html'
# Acute toxicity : at
at_div = soup.find('div', id='id_72_AcuteToxicity')
at_link = at_div.find('a', class_='das-leaf')
at_href = at_link['href']
index_data['acute_toxicity_link'] = LINK_DOSSIER + at_href + '.html'
return index_data
get_substance_index("e4c88c6e-06c7-4daa-b0fb-1a55459ac22f")
return (get_substance_index,)
@app.cell
def _(search_substance):
val = search_substance("100-41-4")
return (val,)
@app.cell
def _(val):
val
return
@app.cell
def _(get_dossier_info, val):
info_dossier = get_dossier_info(val['rmlId'])
return (info_dossier,)
@app.cell
def _(info_dossier):
info_dossier
return
@app.cell
def _(assetExternalId, get_substance_index):
index = get_substance_index(assetExternalId)
index
return (index,)
@app.cell
def _(index, requests):
summary_link = index['toxicological_information_link']
response_summary = requests.get(summary_link)
return (response_summary,)
@app.cell
def _(index, requests):
acute_link = index['acute_toxicity_link']
response_acute = requests.get(acute_link)
return (response_acute,)
@app.cell
def _(index, requests):
repeated_link = index['repeated_dose_toxicity_link']
response_repeated = requests.get(repeated_link)
return (response_repeated,)
@app.cell
def _(BeautifulSoup, response_summary):
soup_summary = BeautifulSoup(response_summary.content, 'html.parser')
soup_summary.prettify(formatter='html')
soup_summary
return
@app.cell
def _(BeautifulSoup, re):
def get_field_name(field_div):
"""Extract field name from the class attribute of label div"""
label_div = field_div.find('div', class_='das-field_label')
if not label_div:
return None
classes = label_div.get('class', [])
for cls in classes:
if cls not in ['das-field_label', 'das-empty-value', 'das-empty-label']:
return cls
return None
def extract_field_value(field_div):
"""Extract value from a das-field div"""
field_name = get_field_name(field_div)
if not field_name:
return None
# Skip OriginalStudy fields
if field_name == 'OriginalStudy':
return None
value_div = field_div.find('div', class_='das-field_value')
if not value_div:
return None
# Exclude redacted/not publishable
redacted = value_div.find('span', class_='das-redacted-value')
if redacted:
return None
# Check if empty
empty_span = value_div.find('span', class_='das-empty-value')
if empty_span and not value_div.find('span', class_='das-redacted-value'):
return {field_name: ""}
# Extract pick-list value
pick_list = value_div.find('span', class_='das-field_value_pick-list')
if pick_list:
phrase = pick_list.find('span', class_='phrase')
if phrase:
return {field_name: phrase.get_text(strip=True)}
if pick_list.find('span', class_='das-empty-value'):
return {field_name: ""}
# Extract quantity value (value + unit)
quantity = value_div.find('span', class_='i6PhysicalQuantity')
if quantity:
value_span = quantity.find('span', class_='value')
unit_span = quantity.find('span', class_='unit')
value_text = value_span.get_text(strip=True) if value_span else ""
unit_text = ""
if unit_span:
unit_phrase = unit_span.find('span', class_='phrase')
if unit_phrase:
unit_text = unit_phrase.get_text(strip=True)
elif unit_span.find('span', class_='das-empty-value'):
unit_text = ""
if value_text:
return {field_name: {"value": value_text, "unit": unit_text}}
else:
return {field_name: ""}
# Extract checkbox value
checkbox_checked = value_div.find('span', class_='das-value_checkbox-checked')
checkbox_unchecked = value_div.find('span', class_='das-value_checkbox-unchecked')
if checkbox_checked is not None or checkbox_unchecked is not None:
return {field_name: checkbox_checked is not None}
# Extract decimal/numeric value
if 'das-field_decimal' in field_div.get('class', []) or 'das-field_text' in field_div.get('class', []):
text = value_div.get_text(strip=True)
if '[Empty]' in text or not text:
return {field_name: ""}
return {field_name: text}
# Extract HTML/text content
if value_div.find('div', class_='das-field_value_html'):
html_content = value_div.find('div', class_='das-field_value_html')
text = html_content.get_text(separator=' ', strip=True)
text = re.sub(r'\[Empty\]', '', text).strip()
if not text:
return {field_name: ""}
return {field_name: text}
# Default: get text content
text = value_div.get_text(strip=True)
text = re.sub(r'\[Empty\]', '', text).strip()
return {field_name: text if text else ""}
def extract_table_data(table):
"""Extract table data as array of objects"""
rows = table.find_all('tr')
if len(rows) < 2:
return []
header_row = rows[0]
headers = []
for th in header_row.find_all('td'):
header_text = th.get_text(strip=True)
headers.append(header_text)
data = []
for row in rows[1:]:
cells = row.find_all('td')
if len(cells) == 1 and cells[0].get('colspan'):
continue
if len(cells) == len(headers):
row_data = {}
for i, cell in enumerate(cells):
cell_text = cell.get_text(strip=True)
row_data[headers[i]] = cell_text
data.append(row_data)
return data
def extract_section(section):
"""Recursively extract data from a section"""
section_data = {}
label_h3 = section.find('h3', class_='das-block_label', recursive=False)
if label_h3:
section_data['label'] = label_h3.get_text(strip=True)
direct_fields = section.find_all('div', class_='das-field', recursive=False)
for field in direct_fields:
field_data = extract_field_value(field)
if field_data:
section_data.update(field_data)
tables = section.find_all('table', recursive=False)
for i, table in enumerate(tables):
table_data = extract_table_data(table)
if table_data:
table_key = f'table_{i+1}' if len(tables) > 1 else 'table'
section_data[table_key] = table_data
nested_sections = section.find_all('section', class_='das-block', recursive=False)
if nested_sections:
section_data['subsections'] = []
for nested in nested_sections:
nested_data = extract_section(nested)
if nested_data:
section_data['subsections'].append(nested_data)
return section_data
def parse_toxicology_html(html_content):
"""Main function to parse the toxicological HTML document"""
soup = BeautifulSoup(html_content, 'html.parser')
result = {}
title = soup.find('h4', class_='document-header')
if title:
result['document_title'] = title.get_text(strip=True)
article = soup.find('article', class_='das-document')
if not article:
return result
top_sections = article.find_all('section', class_='das-block', recursive=False)
result['sections'] = []
for section in top_sections:
section_data = extract_section(section)
if section_data:
result['sections'].append(section_data)
return result
return (parse_toxicology_html,)
@app.cell
def _():
import re
return (re,)
@app.cell
def _(parse_toxicology_html, response_summary):
summary_json = parse_toxicology_html(response_summary.content)
return (summary_json,)
@app.cell
def _(summary_json):
summary_json
return
@app.cell
def _(parse_toxicology_html, response_acute):
acute_json = parse_toxicology_html(response_acute.content)
return (acute_json,)
@app.cell
def _(acute_json):
acute_json
return
@app.cell
def _(parse_toxicology_html, response_repeated):
response_json = parse_toxicology_html(response_repeated.content)
return (response_json,)
@app.cell
def _(response_json):
response_json
return
@app.cell
def _(index):
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto(index['toxicological_information_link'])
page.pdf(path='output.pdf')
browser.close()
return
@app.cell
def _(
get_dossier_info,
get_substance_index,
parse_toxicology_html,
requests,
search_substance,
):
def orchestration(cas) -> dict:
substance = search_substance(cas)
if not substance:
return {}
dossier_info = get_dossier_info(substance['rmlId'])
if not dossier_info:
return {}
index = get_substance_index(dossier_info['assetExternalId'])
if not index:
return {}
result = {
"substance": substance,
"dossier_info": dossier_info,
"index": index,
"toxicological_information": {},
"acute_toxicity": {},
"repeated_dose_toxicity": {}
}
# Fetch and parse toxicological information
txi_link = index.get('toxicological_information_link')
if txi_link:
response_summary = requests.get(txi_link)
if response_summary.status_code == 200:
result['toxicological_information'] = parse_toxicology_html(response_summary.content)
# Fetch and parse acute toxicity
at_link = index.get('acute_toxicity_link')
if at_link:
response_acute = requests.get(at_link)
if response_acute.status_code == 200:
result['acute_toxicity'] = parse_toxicology_html(response_acute.content)
# Fetch and parse repeated dose toxicity
rdt_link = index.get('repeated_dose_toxicity_link')
if rdt_link:
response_repeated = requests.get(rdt_link)
if response_repeated.status_code == 200:
result['repeated_dose_toxicity'] = parse_toxicology_html(response_repeated.content)
return result
return
app._unparsable_cell(
r"""
def check_sub_locally(cas: str) -> dict:
client = get_client()
db = client.get_database(name=\"toxinfo\")
collection = db.get_collection(\"substance_index\")
sub = collection.find_one({\"rmlCas\": cas})
if sub:
return sub
return {})
def add_sub_locally(cas : str) -> None:
client = get_client()
db = client.get_database(name=\"toxinfo\")
collection = db.get_collection(\"substance_index\")
sub = collection.find_one({\"rmlCas\": substance['rmlCas']})
if not sub:
collection.insert_one(substance)
else:
return sub
""",
name="_"
)
@app.cell(hide_code=True)
def _(mo):
mo.md(
r"""
# Cosa manca da fare
1. Creare un nuovo orchestratore per la parte search, caching in mongodb e creare un metodo unico per la ricerca
2. Metodo per validare i json salvati nel database, verificare la data
3. Creare i metodi per astrarre gli html in json
4. Creare i test per ciascuna funzione
5. Creare la documentazione per ciascuna funzione
"""
)
return
if __name__ == "__main__":
app.run()