676 lines
20 KiB
Python
676 lines
20 KiB
Python
import marimo
|
|
|
|
__generated_with = "0.16.5"
|
|
app = marimo.App(width="medium")
|
|
|
|
|
|
@app.cell
|
|
def _():
|
|
import marimo as mo
|
|
import urllib.parse
|
|
import re as standardre
|
|
import json
|
|
from bs4 import BeautifulSoup
|
|
import requests
|
|
return BeautifulSoup, mo, requests, urllib
|
|
|
|
|
|
@app.cell
|
|
def _():
|
|
from pif_compiler.functions.common_log import get_logger
|
|
|
|
log = get_logger()
|
|
return (log,)
|
|
|
|
|
|
@app.cell
|
|
def _(log):
|
|
log.info("testing with marimo")
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _():
|
|
cas_problematici = [
|
|
"25525-21-7",
|
|
"113170-55-1",
|
|
"26172-55-4"
|
|
]
|
|
return (cas_problematici,)
|
|
|
|
|
|
@app.cell
|
|
def _(cas_problematici):
|
|
cas_test = cas_problematici[2]
|
|
return (cas_test,)
|
|
|
|
|
|
@app.cell
|
|
def _(cas_test, urllib):
|
|
urllib.parse.quote(cas_test)
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _():
|
|
BASE_SEARCH = "https://chem.echa.europa.eu/api-substance/v1/substance?pageIndex=1&pageSize=100&searchText="
|
|
BASE_DOSSIER = "https://chem.echa.europa.eu/api-dossier-list/v1/dossier?pageIndex=1&pageSize=100&rmlId="
|
|
SUBSTANCE_SUMMARY = "https://chem.echa.europa.eu/api-substance/v1/substance/" #+id
|
|
CLASSIFICATION_ID = "https://chem.echa.europa.eu/api-cnl-inventory/prominent/overview/classifications/harmonised/459160"
|
|
TOXICOLOGICAL_INFO = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-5f55d8ec-7a71-4e2c-9955-8469ead9fe84_0035f3f8-7467-4944-9028-1db2e9c99565.html" # external + rootkey
|
|
REPEATED_DOSE = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-82402b09-8d8f-495c-b673-95b205be60e0_0035f3f8-7467-4944-9028-1db2e9c99565.html"
|
|
|
|
active = "®istrationStatuses=Active"
|
|
inactive = "®istrationStatuses=Inactive"
|
|
legislation = "&legislation=REACH"
|
|
return BASE_DOSSIER, BASE_SEARCH, active, inactive, legislation
|
|
|
|
|
|
@app.cell
|
|
def _(BASE_SEARCH, cas_test, requests):
|
|
test_search_request = requests.get(BASE_SEARCH + cas_test)
|
|
return (test_search_request,)
|
|
|
|
|
|
@app.cell
|
|
def _(test_search_request):
|
|
response = test_search_request.json()
|
|
return (response,)
|
|
|
|
|
|
@app.cell
|
|
def _(test_search_request):
|
|
test_search_request.json()
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(cas_test, response):
|
|
substance = {}
|
|
|
|
for result in response['items']:
|
|
if result["substanceIndex"]["rmlCas"] == cas_test:
|
|
substance["rmlCas"] = result["substanceIndex"]["rmlCas"]
|
|
substance["rmlId"] = result["substanceIndex"]["rmlId"]
|
|
substance["rmlEc"] = result["substanceIndex"]["rmlEc"]
|
|
substance["rmlName"] = result["substanceIndex"]["rmlName"]
|
|
substance["rmlId"] = result["substanceIndex"]["rmlId"]
|
|
return (substance,)
|
|
|
|
|
|
@app.cell
|
|
def _(substance):
|
|
substance
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(BASE_DOSSIER, active, substance):
|
|
url = BASE_DOSSIER + substance['rmlId'] + active
|
|
url
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(BASE_DOSSIER, inactive, legislation, requests, substance):
|
|
response_dossier = requests.get(BASE_DOSSIER + substance['rmlId'] + inactive + legislation)
|
|
return (response_dossier,)
|
|
|
|
|
|
@app.cell
|
|
def _(response_dossier):
|
|
response_dossier_json = response_dossier.json()
|
|
response_dossier_json
|
|
return (response_dossier_json,)
|
|
|
|
|
|
@app.cell
|
|
def _(response_dossier_json, substance):
|
|
substance['lastUpdatedDate'] = response_dossier_json['items'][0]['lastUpdatedDate']
|
|
substance['registrationStatus'] = response_dossier_json['items'][0]['registrationStatus']
|
|
substance['registrationStatusChangedDate'] = response_dossier_json['items'][0]['registrationStatusChangedDate']
|
|
substance['registrationRole'] = response_dossier_json['items'][0]['reachDossierInfo']['registrationRole']
|
|
substance['assetExternalId'] = response_dossier_json['items'][0]['assetExternalId']
|
|
substance['rootKey'] = response_dossier_json['items'][0]['rootKey']
|
|
substance
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _():
|
|
from pif_compiler.functions.db_utils import get_client
|
|
|
|
client = get_client()
|
|
|
|
db = client.get_database(name="toxinfo")
|
|
return (db,)
|
|
|
|
|
|
@app.cell
|
|
def _(db):
|
|
collection = db.get_collection("substance_index")
|
|
list = db.list_collection_names()
|
|
print(list)
|
|
return (collection,)
|
|
|
|
|
|
@app.cell
|
|
def _(cas_test, collection, substance):
|
|
sub = collection.find_one({"rmlCas": cas_test})
|
|
if not sub:
|
|
collection.insert_one(substance)
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(substance):
|
|
assetExternalId = substance['assetExternalId']
|
|
return (assetExternalId,)
|
|
|
|
|
|
@app.cell
|
|
def _(assetExternalId):
|
|
INDEX_HTML = "https://chem.echa.europa.eu/html-pages/" + assetExternalId + "/index.html"
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(BASE_SEARCH, log, requests):
|
|
def search_substance(cas : str) -> dict:
|
|
response = requests.get(BASE_SEARCH + cas)
|
|
if response.status_code != 200:
|
|
log.error(f"Network error: {response.status_code}")
|
|
return {}
|
|
else:
|
|
response = response.json()
|
|
if response['state']['totalItems'] == 0:
|
|
log.info(f"No substance found for CAS {cas}")
|
|
return {}
|
|
else:
|
|
for result in response['items']:
|
|
if result["substanceIndex"]["rmlCas"] == cas:
|
|
substance = {
|
|
"rmlCas": result["substanceIndex"]["rmlCas"],
|
|
"rmlId": result["substanceIndex"]["rmlId"],
|
|
"rmlEc": result["substanceIndex"]["rmlEc"],
|
|
"rmlName": result["substanceIndex"]["rmlName"],
|
|
"rmlId": result["substanceIndex"]["rmlId"]
|
|
}
|
|
return substance
|
|
log.error(f"Something went wrong")
|
|
return {}
|
|
return (search_substance,)
|
|
|
|
|
|
@app.cell
|
|
def _(BASE_DOSSIER, active, inactive, legislation, log, requests):
|
|
def get_dossier_info(rmlId: str, type = active) -> dict:
|
|
url = BASE_DOSSIER + rmlId + type + legislation
|
|
response_dossier = requests.get(url)
|
|
if response_dossier.status_code != 200:
|
|
log.error(f"Network error: {response_dossier.status_code}")
|
|
return {}
|
|
response_dossier_json = response_dossier.json()
|
|
if response_dossier_json['state']['totalItems'] == 0:
|
|
log.info(f"No dossier found for RML ID {rmlId}")
|
|
return get_dossier_info(rmlId, inactive)
|
|
dossier_info = {
|
|
"lastUpdatedDate": response_dossier_json['items'][0]['lastUpdatedDate'],
|
|
"registrationStatus": response_dossier_json['items'][0]['registrationStatus'],
|
|
"registrationStatusChangedDate": response_dossier_json['items'][0]['registrationStatusChangedDate'],
|
|
"registrationRole": response_dossier_json['items'][0]['reachDossierInfo']['registrationRole'],
|
|
"assetExternalId": response_dossier_json['items'][0]['assetExternalId'],
|
|
"rootKey": response_dossier_json['items'][0]['rootKey']
|
|
}
|
|
return dossier_info
|
|
return (get_dossier_info,)
|
|
|
|
|
|
@app.cell
|
|
def _():
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(BeautifulSoup, log, requests):
|
|
def get_substance_index(assetExternalId : str) -> dict:
|
|
INDEX = "https://chem.echa.europa.eu/html-pages-prod/" + assetExternalId
|
|
LINK_DOSSIER = INDEX + "/documents/"
|
|
|
|
response = requests.get(INDEX + "/index.html")
|
|
if response.status_code != 200:
|
|
log.error(f"Network error: {response.status_code}")
|
|
return {}
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
index_data = {}
|
|
|
|
# Toxicological information : txi
|
|
|
|
txi_div = soup.find('div', id='id_7_Toxicologicalinformation')
|
|
txi_link = txi_div.find('a', class_='das-leaf')
|
|
txi_href = txi_link['href']
|
|
index_data['toxicological_information_link'] = LINK_DOSSIER + txi_href + '.html'
|
|
|
|
# Repeated dose toxicity : rdt
|
|
|
|
rdt_div = soup.find('div', id='id_75_Repeateddosetoxicity')
|
|
rdt_link = rdt_div.find('a', class_='das-leaf')
|
|
rdt_href = rdt_link['href']
|
|
index_data['repeated_dose_toxicity_link'] = LINK_DOSSIER + rdt_href + '.html'
|
|
|
|
# Acute toxicity : at
|
|
|
|
at_div = soup.find('div', id='id_72_AcuteToxicity')
|
|
at_link = at_div.find('a', class_='das-leaf')
|
|
at_href = at_link['href']
|
|
index_data['acute_toxicity_link'] = LINK_DOSSIER + at_href + '.html'
|
|
|
|
return index_data
|
|
|
|
get_substance_index("e4c88c6e-06c7-4daa-b0fb-1a55459ac22f")
|
|
return (get_substance_index,)
|
|
|
|
|
|
@app.cell
|
|
def _(search_substance):
|
|
val = search_substance("100-41-4")
|
|
return (val,)
|
|
|
|
|
|
@app.cell
|
|
def _(val):
|
|
val
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(get_dossier_info, val):
|
|
info_dossier = get_dossier_info(val['rmlId'])
|
|
return (info_dossier,)
|
|
|
|
|
|
@app.cell
|
|
def _(info_dossier):
|
|
info_dossier
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(assetExternalId, get_substance_index):
|
|
index = get_substance_index(assetExternalId)
|
|
index
|
|
return (index,)
|
|
|
|
|
|
@app.cell
|
|
def _(index, requests):
|
|
summary_link = index['toxicological_information_link']
|
|
|
|
response_summary = requests.get(summary_link)
|
|
return (response_summary,)
|
|
|
|
|
|
@app.cell
|
|
def _(index, requests):
|
|
acute_link = index['acute_toxicity_link']
|
|
|
|
response_acute = requests.get(acute_link)
|
|
return (response_acute,)
|
|
|
|
|
|
@app.cell
|
|
def _(index, requests):
|
|
repeated_link = index['repeated_dose_toxicity_link']
|
|
|
|
response_repeated = requests.get(repeated_link)
|
|
return (response_repeated,)
|
|
|
|
|
|
@app.cell
|
|
def _(BeautifulSoup, response_summary):
|
|
soup_summary = BeautifulSoup(response_summary.content, 'html.parser')
|
|
soup_summary.prettify(formatter='html')
|
|
|
|
soup_summary
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(BeautifulSoup, re):
|
|
def get_field_name(field_div):
|
|
"""Extract field name from the class attribute of label div"""
|
|
label_div = field_div.find('div', class_='das-field_label')
|
|
if not label_div:
|
|
return None
|
|
|
|
classes = label_div.get('class', [])
|
|
|
|
for cls in classes:
|
|
if cls not in ['das-field_label', 'das-empty-value', 'das-empty-label']:
|
|
return cls
|
|
|
|
return None
|
|
|
|
|
|
def extract_field_value(field_div):
|
|
"""Extract value from a das-field div"""
|
|
field_name = get_field_name(field_div)
|
|
if not field_name:
|
|
return None
|
|
|
|
# Skip OriginalStudy fields
|
|
if field_name == 'OriginalStudy':
|
|
return None
|
|
|
|
value_div = field_div.find('div', class_='das-field_value')
|
|
if not value_div:
|
|
return None
|
|
|
|
# Exclude redacted/not publishable
|
|
redacted = value_div.find('span', class_='das-redacted-value')
|
|
if redacted:
|
|
return None
|
|
|
|
# Check if empty
|
|
empty_span = value_div.find('span', class_='das-empty-value')
|
|
if empty_span and not value_div.find('span', class_='das-redacted-value'):
|
|
return {field_name: ""}
|
|
|
|
# Extract pick-list value
|
|
pick_list = value_div.find('span', class_='das-field_value_pick-list')
|
|
if pick_list:
|
|
phrase = pick_list.find('span', class_='phrase')
|
|
if phrase:
|
|
return {field_name: phrase.get_text(strip=True)}
|
|
if pick_list.find('span', class_='das-empty-value'):
|
|
return {field_name: ""}
|
|
|
|
# Extract quantity value (value + unit)
|
|
quantity = value_div.find('span', class_='i6PhysicalQuantity')
|
|
if quantity:
|
|
value_span = quantity.find('span', class_='value')
|
|
unit_span = quantity.find('span', class_='unit')
|
|
|
|
value_text = value_span.get_text(strip=True) if value_span else ""
|
|
unit_text = ""
|
|
if unit_span:
|
|
unit_phrase = unit_span.find('span', class_='phrase')
|
|
if unit_phrase:
|
|
unit_text = unit_phrase.get_text(strip=True)
|
|
elif unit_span.find('span', class_='das-empty-value'):
|
|
unit_text = ""
|
|
|
|
if value_text:
|
|
return {field_name: {"value": value_text, "unit": unit_text}}
|
|
else:
|
|
return {field_name: ""}
|
|
|
|
# Extract checkbox value
|
|
checkbox_checked = value_div.find('span', class_='das-value_checkbox-checked')
|
|
checkbox_unchecked = value_div.find('span', class_='das-value_checkbox-unchecked')
|
|
if checkbox_checked is not None or checkbox_unchecked is not None:
|
|
return {field_name: checkbox_checked is not None}
|
|
|
|
# Extract decimal/numeric value
|
|
if 'das-field_decimal' in field_div.get('class', []) or 'das-field_text' in field_div.get('class', []):
|
|
text = value_div.get_text(strip=True)
|
|
if '[Empty]' in text or not text:
|
|
return {field_name: ""}
|
|
return {field_name: text}
|
|
|
|
# Extract HTML/text content
|
|
if value_div.find('div', class_='das-field_value_html'):
|
|
html_content = value_div.find('div', class_='das-field_value_html')
|
|
text = html_content.get_text(separator=' ', strip=True)
|
|
text = re.sub(r'\[Empty\]', '', text).strip()
|
|
if not text:
|
|
return {field_name: ""}
|
|
return {field_name: text}
|
|
|
|
# Default: get text content
|
|
text = value_div.get_text(strip=True)
|
|
text = re.sub(r'\[Empty\]', '', text).strip()
|
|
return {field_name: text if text else ""}
|
|
|
|
|
|
def extract_table_data(table):
|
|
"""Extract table data as array of objects"""
|
|
rows = table.find_all('tr')
|
|
if len(rows) < 2:
|
|
return []
|
|
|
|
header_row = rows[0]
|
|
headers = []
|
|
for th in header_row.find_all('td'):
|
|
header_text = th.get_text(strip=True)
|
|
headers.append(header_text)
|
|
|
|
data = []
|
|
for row in rows[1:]:
|
|
cells = row.find_all('td')
|
|
|
|
if len(cells) == 1 and cells[0].get('colspan'):
|
|
continue
|
|
|
|
if len(cells) == len(headers):
|
|
row_data = {}
|
|
for i, cell in enumerate(cells):
|
|
cell_text = cell.get_text(strip=True)
|
|
row_data[headers[i]] = cell_text
|
|
data.append(row_data)
|
|
|
|
return data
|
|
|
|
|
|
def extract_section(section):
|
|
"""Recursively extract data from a section"""
|
|
section_data = {}
|
|
|
|
label_h3 = section.find('h3', class_='das-block_label', recursive=False)
|
|
if label_h3:
|
|
section_data['label'] = label_h3.get_text(strip=True)
|
|
|
|
direct_fields = section.find_all('div', class_='das-field', recursive=False)
|
|
for field in direct_fields:
|
|
field_data = extract_field_value(field)
|
|
if field_data:
|
|
section_data.update(field_data)
|
|
|
|
tables = section.find_all('table', recursive=False)
|
|
for i, table in enumerate(tables):
|
|
table_data = extract_table_data(table)
|
|
if table_data:
|
|
table_key = f'table_{i+1}' if len(tables) > 1 else 'table'
|
|
section_data[table_key] = table_data
|
|
|
|
nested_sections = section.find_all('section', class_='das-block', recursive=False)
|
|
if nested_sections:
|
|
section_data['subsections'] = []
|
|
for nested in nested_sections:
|
|
nested_data = extract_section(nested)
|
|
if nested_data:
|
|
section_data['subsections'].append(nested_data)
|
|
|
|
return section_data
|
|
|
|
|
|
def parse_toxicology_html(html_content):
|
|
"""Main function to parse the toxicological HTML document"""
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
result = {}
|
|
|
|
title = soup.find('h4', class_='document-header')
|
|
if title:
|
|
result['document_title'] = title.get_text(strip=True)
|
|
|
|
article = soup.find('article', class_='das-document')
|
|
if not article:
|
|
return result
|
|
|
|
top_sections = article.find_all('section', class_='das-block', recursive=False)
|
|
result['sections'] = []
|
|
|
|
for section in top_sections:
|
|
section_data = extract_section(section)
|
|
if section_data:
|
|
result['sections'].append(section_data)
|
|
|
|
return result
|
|
return (parse_toxicology_html,)
|
|
|
|
|
|
@app.cell
|
|
def _():
|
|
import re
|
|
return (re,)
|
|
|
|
|
|
@app.cell
|
|
def _(parse_toxicology_html, response_summary):
|
|
summary_json = parse_toxicology_html(response_summary.content)
|
|
return (summary_json,)
|
|
|
|
|
|
@app.cell
|
|
def _(summary_json):
|
|
summary_json
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(parse_toxicology_html, response_acute):
|
|
acute_json = parse_toxicology_html(response_acute.content)
|
|
return (acute_json,)
|
|
|
|
|
|
@app.cell
|
|
def _(acute_json):
|
|
acute_json
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(parse_toxicology_html, response_repeated):
|
|
response_json = parse_toxicology_html(response_repeated.content)
|
|
return (response_json,)
|
|
|
|
|
|
@app.cell
|
|
def _(response_json):
|
|
response_json
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(index):
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch()
|
|
page = browser.new_page()
|
|
page.goto(index['toxicological_information_link'])
|
|
page.pdf(path='output.pdf')
|
|
browser.close()
|
|
return
|
|
|
|
|
|
@app.cell
|
|
def _(
|
|
get_dossier_info,
|
|
get_substance_index,
|
|
parse_toxicology_html,
|
|
requests,
|
|
search_substance,
|
|
):
|
|
def orchestration(cas) -> dict:
|
|
substance = search_substance(cas)
|
|
if not substance:
|
|
return {}
|
|
|
|
dossier_info = get_dossier_info(substance['rmlId'])
|
|
if not dossier_info:
|
|
return {}
|
|
|
|
index = get_substance_index(dossier_info['assetExternalId'])
|
|
if not index:
|
|
return {}
|
|
|
|
result = {
|
|
"substance": substance,
|
|
"dossier_info": dossier_info,
|
|
"index": index,
|
|
"toxicological_information": {},
|
|
"acute_toxicity": {},
|
|
"repeated_dose_toxicity": {}
|
|
}
|
|
|
|
# Fetch and parse toxicological information
|
|
txi_link = index.get('toxicological_information_link')
|
|
if txi_link:
|
|
response_summary = requests.get(txi_link)
|
|
if response_summary.status_code == 200:
|
|
result['toxicological_information'] = parse_toxicology_html(response_summary.content)
|
|
|
|
# Fetch and parse acute toxicity
|
|
at_link = index.get('acute_toxicity_link')
|
|
if at_link:
|
|
response_acute = requests.get(at_link)
|
|
if response_acute.status_code == 200:
|
|
result['acute_toxicity'] = parse_toxicology_html(response_acute.content)
|
|
|
|
# Fetch and parse repeated dose toxicity
|
|
rdt_link = index.get('repeated_dose_toxicity_link')
|
|
if rdt_link:
|
|
response_repeated = requests.get(rdt_link)
|
|
if response_repeated.status_code == 200:
|
|
result['repeated_dose_toxicity'] = parse_toxicology_html(response_repeated.content)
|
|
|
|
return result
|
|
return
|
|
|
|
|
|
app._unparsable_cell(
|
|
r"""
|
|
def check_sub_locally(cas: str) -> dict:
|
|
client = get_client()
|
|
db = client.get_database(name=\"toxinfo\")
|
|
collection = db.get_collection(\"substance_index\")
|
|
sub = collection.find_one({\"rmlCas\": cas})
|
|
if sub:
|
|
return sub
|
|
return {})
|
|
|
|
def add_sub_locally(cas : str) -> None:
|
|
client = get_client()
|
|
db = client.get_database(name=\"toxinfo\")
|
|
collection = db.get_collection(\"substance_index\")
|
|
sub = collection.find_one({\"rmlCas\": substance['rmlCas']})
|
|
if not sub:
|
|
collection.insert_one(substance)
|
|
else:
|
|
return sub
|
|
""",
|
|
name="_"
|
|
)
|
|
|
|
|
|
@app.cell(hide_code=True)
|
|
def _(mo):
|
|
mo.md(
|
|
r"""
|
|
# Cosa manca da fare
|
|
|
|
1. Creare un nuovo orchestratore per la parte search, caching in mongodb e creare un metodo unico per la ricerca
|
|
2. Metodo per validare i json salvati nel database, verificare la data
|
|
3. Creare i metodi per astrarre gli html in json
|
|
4. Creare i test per ciascuna funzione
|
|
5. Creare la documentazione per ciascuna funzione
|
|
"""
|
|
)
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|