cosmoguard-bd/src/pif_compiler/services/srv_echa.py
2025-11-10 22:02:59 +01:00

419 lines
No EOL
14 KiB
Python

import os
import requests
import json
import re
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from playwright.sync_api import sync_playwright
from pif_compiler.services.common_log import get_logger
from pif_compiler.services.db_utils import db_connect
log = get_logger()
load_dotenv()
BASE_SEARCH = "https://chem.echa.europa.eu/api-substance/v1/substance?pageIndex=1&pageSize=100&searchText="
BASE_DOSSIER = "https://chem.echa.europa.eu/api-dossier-list/v1/dossier?pageIndex=1&pageSize=100&rmlId="
SUBSTANCE_SUMMARY = "https://chem.echa.europa.eu/api-substance/v1/substance/" #+id
CLASSIFICATION_ID = "https://chem.echa.europa.eu/api-cnl-inventory/prominent/overview/classifications/harmonised/459160"
TOXICOLOGICAL_INFO = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-5f55d8ec-7a71-4e2c-9955-8469ead9fe84_0035f3f8-7467-4944-9028-1db2e9c99565.html" # external + rootkey
REPEATED_DOSE = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-82402b09-8d8f-495c-b673-95b205be60e0_0035f3f8-7467-4944-9028-1db2e9c99565.html"
active = "&registrationStatuses=Active"
inactive = "&registrationStatuses=Inactive"
legislation = "&legislation=REACH"
#region ECHA scraping functions
def search_substance(cas : str) -> dict:
response = requests.get(BASE_SEARCH + cas)
if response.status_code != 200:
log.error(f"Network error: {response.status_code}")
return {}
else:
response = response.json()
if response['state']['totalItems'] == 0:
log.info(f"No substance found for CAS {cas}")
return {}
else:
for result in response['items']:
if result["substanceIndex"]["rmlCas"] == cas:
substance = {
"rmlCas": result["substanceIndex"]["rmlCas"],
"rmlId": result["substanceIndex"]["rmlId"],
"rmlEc": result["substanceIndex"]["rmlEc"],
"rmlName": result["substanceIndex"]["rmlName"],
"rmlId": result["substanceIndex"]["rmlId"]
}
return substance
log.error(f"Something went wrong")
return {}
def get_dossier_info(rmlId: str) -> dict:
url = BASE_DOSSIER + rmlId + active + legislation
response_dossier = requests.get(url)
if response_dossier.status_code != 200:
log.error(f"Network error: {response_dossier.status_code}")
return {}
response_dossier_json = response_dossier.json()
if response_dossier_json['state']['totalItems'] == 0:
log.info(f"No dossier found for RML ID {rmlId}")
return {}
dossier_info = {
"lastUpdatedDate": response_dossier_json['items'][0]['lastUpdatedDate'],
"registrationStatus": response_dossier_json['items'][0]['registrationStatus'],
"registrationStatusChangedDate": response_dossier_json['items'][0]['registrationStatusChangedDate'],
"registrationRole": response_dossier_json['items'][0]['reachDossierInfo']['registrationRole'],
"assetExternalId": response_dossier_json['items'][0]['assetExternalId'],
"rootKey": response_dossier_json['items'][0]['rootKey']
}
return dossier_info
def get_substance_index(assetExternalId : str) -> dict:
INDEX = "https://chem.echa.europa.eu/html-pages-prod/" + assetExternalId
LINK_DOSSIER = INDEX + "/documents/"
response = requests.get(INDEX + "/index.html")
if response.status_code != 200:
log.error(f"Network error: {response.status_code}")
return {}
soup = BeautifulSoup(response.content, 'html.parser')
index_data = {}
# Toxicological information : txi
txi_div = soup.find('div', id='id_7_Toxicologicalinformation')
txi_link = txi_div.find('a', class_='das-leaf')
txi_href = txi_link['href']
index_data['toxicological_information_link'] = LINK_DOSSIER + txi_href + '.html'
# Repeated dose toxicity : rdt
rdt_div = soup.find('div', id='id_75_Repeateddosetoxicity')
rdt_link = rdt_div.find('a', class_='das-leaf')
rdt_href = rdt_link['href']
index_data['repeated_dose_toxicity_link'] = LINK_DOSSIER + rdt_href + '.html'
# Acute toxicity : at
at_div = soup.find('div', id='id_72_AcuteToxicity')
at_link = at_div.find('a', class_='das-leaf')
at_href = at_link['href']
index_data['acute_toxicity_link'] = LINK_DOSSIER + at_href + '.html'
return index_data
#endregion
#region ECHA parsing functions of html pages
def get_field_name(field_div):
"""Extract field name from the class attribute of label div"""
label_div = field_div.find('div', class_='das-field_label')
if not label_div:
return None
classes = label_div.get('class', [])
for cls in classes:
if cls not in ['das-field_label', 'das-empty-value', 'das-empty-label']:
return cls
return None
def extract_field_value(field_div):
"""Extract value from a das-field div"""
field_name = get_field_name(field_div)
if not field_name:
return None
# Skip OriginalStudy fields
if field_name == 'OriginalStudy':
return None
value_div = field_div.find('div', class_='das-field_value')
if not value_div:
return None
# Exclude redacted/not publishable
redacted = value_div.find('span', class_='das-redacted-value')
if redacted:
return None
# Check if empty
empty_span = value_div.find('span', class_='das-empty-value')
if empty_span and not value_div.find('span', class_='das-redacted-value'):
return {field_name: ""}
# Extract pick-list value
pick_list = value_div.find('span', class_='das-field_value_pick-list')
if pick_list:
phrase = pick_list.find('span', class_='phrase')
if phrase:
return {field_name: phrase.get_text(strip=True)}
if pick_list.find('span', class_='das-empty-value'):
return {field_name: ""}
# Extract quantity value (value + unit)
quantity = value_div.find('span', class_='i6PhysicalQuantity')
if quantity:
value_span = quantity.find('span', class_='value')
unit_span = quantity.find('span', class_='unit')
value_text = value_span.get_text(strip=True) if value_span else ""
unit_text = ""
if unit_span:
unit_phrase = unit_span.find('span', class_='phrase')
if unit_phrase:
unit_text = unit_phrase.get_text(strip=True)
elif unit_span.find('span', class_='das-empty-value'):
unit_text = ""
if value_text:
return {field_name: {"value": value_text, "unit": unit_text}}
else:
return {field_name: ""}
# Extract checkbox value
checkbox_checked = value_div.find('span', class_='das-value_checkbox-checked')
checkbox_unchecked = value_div.find('span', class_='das-value_checkbox-unchecked')
if checkbox_checked is not None or checkbox_unchecked is not None:
return {field_name: checkbox_checked is not None}
# Extract decimal/numeric value
if 'das-field_decimal' in field_div.get('class', []) or 'das-field_text' in field_div.get('class', []):
text = value_div.get_text(strip=True)
if '[Empty]' in text or not text:
return {field_name: ""}
return {field_name: text}
# Extract HTML/text content
if value_div.find('div', class_='das-field_value_html'):
html_content = value_div.find('div', class_='das-field_value_html')
text = html_content.get_text(separator=' ', strip=True)
text = re.sub(r'\[Empty\]', '', text).strip()
if not text:
return {field_name: ""}
return {field_name: text}
# Default: get text content
text = value_div.get_text(strip=True)
text = re.sub(r'\[Empty\]', '', text).strip()
return {field_name: text if text else ""}
def extract_table_data(table):
"""Extract table data as array of objects"""
rows = table.find_all('tr')
if len(rows) < 2:
return []
header_row = rows[0]
headers = []
for th in header_row.find_all('td'):
header_text = th.get_text(strip=True)
headers.append(header_text)
data = []
for row in rows[1:]:
cells = row.find_all('td')
if len(cells) == 1 and cells[0].get('colspan'):
continue
if len(cells) == len(headers):
row_data = {}
for i, cell in enumerate(cells):
cell_text = cell.get_text(strip=True)
row_data[headers[i]] = cell_text
data.append(row_data)
return data
def extract_section(section):
"""Recursively extract data from a section"""
section_data = {}
label_h3 = section.find('h3', class_='das-block_label', recursive=False)
if label_h3:
section_data['label'] = label_h3.get_text(strip=True)
direct_fields = section.find_all('div', class_='das-field', recursive=False)
for field in direct_fields:
field_data = extract_field_value(field)
if field_data:
section_data.update(field_data)
tables = section.find_all('table', recursive=False)
for i, table in enumerate(tables):
table_data = extract_table_data(table)
if table_data:
table_key = f'table_{i+1}' if len(tables) > 1 else 'table'
section_data[table_key] = table_data
nested_sections = section.find_all('section', class_='das-block', recursive=False)
if nested_sections:
section_data['subsections'] = []
for nested in nested_sections:
nested_data = extract_section(nested)
if nested_data:
section_data['subsections'].append(nested_data)
return section_data
def parse_toxicology_html(html_content):
"""Main function to parse the toxicological HTML document"""
soup = BeautifulSoup(html_content, 'html.parser')
result = {}
title = soup.find('h4', class_='document-header')
if title:
result['document_title'] = title.get_text(strip=True)
article = soup.find('article', class_='das-document')
if not article:
return result
top_sections = article.find_all('section', class_='das-block', recursive=False)
result['sections'] = []
for section in top_sections:
section_data = extract_section(section)
if section_data:
result['sections'].append(section_data)
return result
#endregion
#region PDF extraction functions
def generate_pdf_from_toxicology_info(index: dict):
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.goto(index['toxicological_information_link'])
page.pdf(path=f'pdfs/{index["substance"]["rmlCas"]}.pdf')
browser.close()
#endregion
#region Orchestrator functions
def echa_flow(cas) -> dict:
try:
substance = search_substance(cas)
dossier_info = get_dossier_info(substance['rmlId'])
index = get_substance_index(dossier_info['assetExternalId'])
except Exception as e:
log.error(f"Error in ECHA flow for CAS {cas}: {e}")
return {}
result = {
"substance": substance,
"dossier_info": dossier_info,
"index": index,
"toxicological_information": {},
"acute_toxicity": {},
"repeated_dose_toxicity": {}
}
# Fetch and parse toxicological information
txi_link = index.get('toxicological_information_link')
if txi_link:
response_summary = requests.get(txi_link)
if response_summary.status_code == 200:
result['toxicological_information'] = parse_toxicology_html(response_summary.content)
# Fetch and parse acute toxicity
at_link = index.get('acute_toxicity_link')
if at_link:
response_acute = requests.get(at_link)
if response_acute.status_code == 200:
result['acute_toxicity'] = parse_toxicology_html(response_acute.content)
# Fetch and parse repeated dose toxicity
rdt_link = index.get('repeated_dose_toxicity_link')
if rdt_link:
response_repeated = requests.get(rdt_link)
if response_repeated.status_code == 200:
result['repeated_dose_toxicity'] = parse_toxicology_html(response_repeated.content)
for key, value in result.items():
if value is None or value == "" or value == [] or value == {}:
return False
return result
def cas_validation(cas: str) -> str:
log.info(f"Starting ECHA data extraction for CAS: {cas}")
if cas is None or cas.strip() == "":
log.error("No CAS number provided.")
return None
cas_stripped = cas.replace("-", "")
if cas_stripped.isdigit() and len(cas_stripped) <= 12:
log.info(f"CAS number {cas} maybe is valid.")
return cas.strip()
else:
log.error(f"CAS number {cas} is not valid.")
return None
def check_local(cas: str) -> bool:
client, db, collection = db_connect()
if not collection:
log.error("No MongoDB collection available.")
return None
record = collection.find_one({"substance.rmlCas": cas})
if record:
log.info(f"Record for CAS {cas} found in local database.")
return record
else:
log.info(f"No record for CAS {cas} found in local database.")
return None
def add_to_local(data: dict) -> bool:
client, db, collection = db_connect()
if not collection:
log.error("No MongoDB collection available.")
return False
try:
collection.insert_one(data)
log.info(f"Data for CAS {data['substance']['rmlCas']} added to local database.")
return True
except Exception as e:
log.error(f"Error inserting data into MongoDB: {e}")
return False
def search_substance(cas: str) -> dict:
cas_validated = cas_validation(cas)
if not cas_validated:
return None
else:
local_record = check_local(cas_validated)
if local_record:
return local_record
else:
echa_data = echa_flow(cas_validated)
if echa_data:
add_to_local(echa_data)
return echa_data
else:
log.error(f"Failed to retrieve ECHA data for CAS {cas}.")
return None
# to do: check if document is complete
# to do: check lastupdate
#endregion