443 lines
No EOL
15 KiB
Python
443 lines
No EOL
15 KiB
Python
import os
|
|
import requests
|
|
import json
|
|
import re
|
|
|
|
from bs4 import BeautifulSoup
|
|
from dotenv import load_dotenv
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
from pif_compiler.functions.common_log import get_logger
|
|
from pif_compiler.functions.db_utils import db_connect
|
|
|
|
log = get_logger()
|
|
load_dotenv()
|
|
|
|
BASE_SEARCH = "https://chem.echa.europa.eu/api-substance/v1/substance?pageIndex=1&pageSize=100&searchText="
|
|
BASE_DOSSIER = "https://chem.echa.europa.eu/api-dossier-list/v1/dossier?pageIndex=1&pageSize=100&rmlId="
|
|
SUBSTANCE_SUMMARY = "https://chem.echa.europa.eu/api-substance/v1/substance/" #+id
|
|
CLASSIFICATION_ID = "https://chem.echa.europa.eu/api-cnl-inventory/prominent/overview/classifications/harmonised/459160"
|
|
TOXICOLOGICAL_INFO = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-5f55d8ec-7a71-4e2c-9955-8469ead9fe84_0035f3f8-7467-4944-9028-1db2e9c99565.html" # external + rootkey
|
|
REPEATED_DOSE = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-82402b09-8d8f-495c-b673-95b205be60e0_0035f3f8-7467-4944-9028-1db2e9c99565.html"
|
|
|
|
active = "®istrationStatuses=Active"
|
|
inactive = "®istrationStatuses=Inactive"
|
|
legislation = "&legislation=REACH"
|
|
|
|
#region ECHA scraping functions
|
|
|
|
def search_substance(cas : str) -> dict:
|
|
response = requests.get(BASE_SEARCH + cas)
|
|
if response.status_code != 200:
|
|
log.error(f"Network error: {response.status_code}")
|
|
return {}
|
|
else:
|
|
response = response.json()
|
|
if response['state']['totalItems'] == 0:
|
|
log.info(f"No substance found for CAS {cas}")
|
|
return {}
|
|
else:
|
|
for result in response['items']:
|
|
if result["substanceIndex"]["rmlCas"] == cas:
|
|
substance = {
|
|
"rmlCas": result["substanceIndex"]["rmlCas"],
|
|
"rmlId": result["substanceIndex"]["rmlId"],
|
|
"rmlEc": result["substanceIndex"]["rmlEc"],
|
|
"rmlName": result["substanceIndex"]["rmlName"],
|
|
"rmlId": result["substanceIndex"]["rmlId"]
|
|
}
|
|
log.info(f"Substance found for CAS {cas}: {substance['rmlName']}")
|
|
return substance
|
|
log.error(f"Something went wrong searching the substance for CAS {cas}")
|
|
return {}
|
|
|
|
|
|
def get_dossier_info(rmlId: str) -> dict:
|
|
url = BASE_DOSSIER + rmlId + active + legislation
|
|
response_dossier = requests.get(url)
|
|
if response_dossier.status_code != 200:
|
|
log.error(f"Network error: {response_dossier.status_code}")
|
|
return {}
|
|
response_dossier_json = response_dossier.json()
|
|
if response_dossier_json['state']['totalItems'] == 0:
|
|
log.info(f"No dossier found for RML ID {rmlId}")
|
|
return {}
|
|
dossier_info = {
|
|
"lastUpdatedDate": response_dossier_json['items'][0]['lastUpdatedDate'],
|
|
"registrationStatus": response_dossier_json['items'][0]['registrationStatus'],
|
|
"registrationStatusChangedDate": response_dossier_json['items'][0]['registrationStatusChangedDate'],
|
|
"registrationRole": response_dossier_json['items'][0]['reachDossierInfo']['registrationRole'],
|
|
"assetExternalId": response_dossier_json['items'][0]['assetExternalId'],
|
|
"rootKey": response_dossier_json['items'][0]['rootKey']
|
|
}
|
|
log.info(f"Dossier info retrieved for RML ID {rmlId}")
|
|
return dossier_info
|
|
|
|
|
|
def get_substance_index(assetExternalId : str) -> dict:
|
|
INDEX = "https://chem.echa.europa.eu/html-pages-prod/" + assetExternalId
|
|
LINK_DOSSIER = INDEX + "/documents/"
|
|
|
|
response = requests.get(INDEX + "/index.html")
|
|
if response.status_code != 200:
|
|
log.error(f"Network error: {response.status_code}")
|
|
return {}
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
index_data = {}
|
|
|
|
# Toxicological information : txi
|
|
|
|
txi_div = soup.find('div', id='id_7_Toxicologicalinformation')
|
|
txi_link = txi_div.find('a', class_='das-leaf')
|
|
txi_href = txi_link['href']
|
|
index_data['toxicological_information_link'] = LINK_DOSSIER + txi_href + '.html'
|
|
|
|
# Repeated dose toxicity : rdt
|
|
|
|
rdt_div = soup.find('div', id='id_75_Repeateddosetoxicity')
|
|
rdt_link = rdt_div.find('a', class_='das-leaf')
|
|
rdt_href = rdt_link['href']
|
|
index_data['repeated_dose_toxicity_link'] = LINK_DOSSIER + rdt_href + '.html'
|
|
|
|
# Acute toxicity : at
|
|
|
|
at_div = soup.find('div', id='id_72_AcuteToxicity')
|
|
at_link = at_div.find('a', class_='das-leaf')
|
|
at_href = at_link['href']
|
|
index_data['acute_toxicity_link'] = LINK_DOSSIER + at_href + '.html'
|
|
|
|
log.info(f"Substance index retrieved for Asset External ID {assetExternalId}")
|
|
|
|
return index_data
|
|
|
|
#endregion
|
|
|
|
#region ECHA parsing functions of html pages
|
|
|
|
def get_field_name(field_div):
|
|
"""Extract field name from the class attribute of label div"""
|
|
label_div = field_div.find('div', class_='das-field_label')
|
|
if not label_div:
|
|
return None
|
|
|
|
classes = label_div.get('class', [])
|
|
|
|
for cls in classes:
|
|
if cls not in ['das-field_label', 'das-empty-value', 'das-empty-label']:
|
|
return cls
|
|
|
|
return None
|
|
|
|
|
|
def extract_field_value(field_div):
|
|
"""Extract value from a das-field div"""
|
|
field_name = get_field_name(field_div)
|
|
if not field_name:
|
|
return None
|
|
|
|
# Skip OriginalStudy fields
|
|
if field_name == 'OriginalStudy':
|
|
return None
|
|
|
|
value_div = field_div.find('div', class_='das-field_value')
|
|
if not value_div:
|
|
return None
|
|
|
|
# Exclude redacted/not publishable
|
|
redacted = value_div.find('span', class_='das-redacted-value')
|
|
if redacted:
|
|
return None
|
|
|
|
# Check if empty
|
|
empty_span = value_div.find('span', class_='das-empty-value')
|
|
if empty_span and not value_div.find('span', class_='das-redacted-value'):
|
|
return {field_name: ""}
|
|
|
|
# Extract pick-list value
|
|
pick_list = value_div.find('span', class_='das-field_value_pick-list')
|
|
if pick_list:
|
|
phrase = pick_list.find('span', class_='phrase')
|
|
if phrase:
|
|
return {field_name: phrase.get_text(strip=True)}
|
|
if pick_list.find('span', class_='das-empty-value'):
|
|
return {field_name: ""}
|
|
|
|
# Extract quantity value (value + unit)
|
|
quantity = value_div.find('span', class_='i6PhysicalQuantity')
|
|
if quantity:
|
|
value_span = quantity.find('span', class_='value')
|
|
unit_span = quantity.find('span', class_='unit')
|
|
|
|
value_text = value_span.get_text(strip=True) if value_span else ""
|
|
unit_text = ""
|
|
if unit_span:
|
|
unit_phrase = unit_span.find('span', class_='phrase')
|
|
if unit_phrase:
|
|
unit_text = unit_phrase.get_text(strip=True)
|
|
elif unit_span.find('span', class_='das-empty-value'):
|
|
unit_text = ""
|
|
|
|
if value_text:
|
|
return {field_name: {"value": value_text, "unit": unit_text}}
|
|
else:
|
|
return {field_name: ""}
|
|
|
|
# Extract checkbox value
|
|
checkbox_checked = value_div.find('span', class_='das-value_checkbox-checked')
|
|
checkbox_unchecked = value_div.find('span', class_='das-value_checkbox-unchecked')
|
|
if checkbox_checked is not None or checkbox_unchecked is not None:
|
|
return {field_name: checkbox_checked is not None}
|
|
|
|
# Extract decimal/numeric value
|
|
if 'das-field_decimal' in field_div.get('class', []) or 'das-field_text' in field_div.get('class', []):
|
|
text = value_div.get_text(strip=True)
|
|
if '[Empty]' in text or not text:
|
|
return {field_name: ""}
|
|
return {field_name: text}
|
|
|
|
# Extract HTML/text content
|
|
if value_div.find('div', class_='das-field_value_html'):
|
|
html_content = value_div.find('div', class_='das-field_value_html')
|
|
text = html_content.get_text(separator=' ', strip=True)
|
|
text = re.sub(r'\[Empty\]', '', text).strip()
|
|
if not text:
|
|
return {field_name: ""}
|
|
return {field_name: text}
|
|
|
|
# Default: get text content
|
|
text = value_div.get_text(strip=True)
|
|
text = re.sub(r'\[Empty\]', '', text).strip()
|
|
return {field_name: text if text else ""}
|
|
|
|
|
|
def extract_table_data(table):
|
|
"""Extract table data as array of objects"""
|
|
rows = table.find_all('tr')
|
|
if len(rows) < 2:
|
|
return []
|
|
|
|
header_row = rows[0]
|
|
headers = []
|
|
for th in header_row.find_all('td'):
|
|
header_text = th.get_text(strip=True)
|
|
headers.append(header_text)
|
|
|
|
data = []
|
|
for row in rows[1:]:
|
|
cells = row.find_all('td')
|
|
|
|
if len(cells) == 1 and cells[0].get('colspan'):
|
|
continue
|
|
|
|
if len(cells) == len(headers):
|
|
row_data = {}
|
|
for i, cell in enumerate(cells):
|
|
cell_text = cell.get_text(strip=True)
|
|
row_data[headers[i]] = cell_text
|
|
data.append(row_data)
|
|
|
|
return data
|
|
|
|
|
|
def extract_section(section):
|
|
"""Recursively extract data from a section"""
|
|
section_data = {}
|
|
|
|
label_h3 = section.find('h3', class_='das-block_label', recursive=False)
|
|
if label_h3:
|
|
section_data['label'] = label_h3.get_text(strip=True)
|
|
|
|
direct_fields = section.find_all('div', class_='das-field', recursive=False)
|
|
for field in direct_fields:
|
|
field_data = extract_field_value(field)
|
|
if field_data:
|
|
section_data.update(field_data)
|
|
|
|
tables = section.find_all('table', recursive=False)
|
|
for i, table in enumerate(tables):
|
|
table_data = extract_table_data(table)
|
|
if table_data:
|
|
table_key = f'table_{i+1}' if len(tables) > 1 else 'table'
|
|
section_data[table_key] = table_data
|
|
|
|
nested_sections = section.find_all('section', class_='das-block', recursive=False)
|
|
if nested_sections:
|
|
section_data['subsections'] = []
|
|
for nested in nested_sections:
|
|
nested_data = extract_section(nested)
|
|
if nested_data:
|
|
section_data['subsections'].append(nested_data)
|
|
|
|
return section_data
|
|
|
|
|
|
def parse_toxicology_html(html_content):
|
|
"""Main function to parse the toxicological HTML document"""
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
result = {}
|
|
|
|
title = soup.find('h4', class_='document-header')
|
|
if title:
|
|
result['document_title'] = title.get_text(strip=True)
|
|
|
|
article = soup.find('article', class_='das-document')
|
|
if not article:
|
|
return result
|
|
|
|
top_sections = article.find_all('section', class_='das-block', recursive=False)
|
|
result['sections'] = []
|
|
|
|
for section in top_sections:
|
|
section_data = extract_section(section)
|
|
if section_data:
|
|
result['sections'].append(section_data)
|
|
|
|
return result
|
|
|
|
#endregion
|
|
|
|
#region PDF extraction functions
|
|
|
|
def generate_pdf_from_toxicology_info(index: dict):
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch()
|
|
page = browser.new_page()
|
|
page.goto(index['toxicological_information_link'])
|
|
page.pdf(path=f'pdfs/{index["substance"]["rmlCas"]}.pdf')
|
|
browser.close()
|
|
if os.path.exists(f'pdfs/{index["substance"]["rmlCas"]}.pdf'):
|
|
log.info(f"PDF generated for CAS {index['substance']['rmlCas']}")
|
|
return True
|
|
else:
|
|
log.error(f"PDF generation failed for CAS {index['substance']['rmlCas']}")
|
|
return False
|
|
|
|
#endregion
|
|
|
|
#region Orchestrator functions
|
|
|
|
def echa_flow(cas) -> dict:
|
|
try:
|
|
substance = search_substance(cas)
|
|
dossier_info = get_dossier_info(substance['rmlId'])
|
|
index = get_substance_index(dossier_info['assetExternalId'])
|
|
except Exception as e:
|
|
log.error(f"Error in ECHA flow for CAS {cas}: {e}")
|
|
return {}
|
|
|
|
result = {
|
|
"substance": substance,
|
|
"dossier_info": dossier_info,
|
|
"index": index,
|
|
"toxicological_information": {},
|
|
"acute_toxicity": {},
|
|
"repeated_dose_toxicity": {}
|
|
}
|
|
|
|
log.debug(f"ECHA flow intermediate result")
|
|
|
|
# Fetch and parse toxicological information
|
|
txi_link = index.get('toxicological_information_link')
|
|
if txi_link:
|
|
response_summary = requests.get(txi_link)
|
|
if response_summary.status_code == 200:
|
|
result['toxicological_information'] = parse_toxicology_html(response_summary.content)
|
|
|
|
# Fetch and parse acute toxicity
|
|
at_link = index.get('acute_toxicity_link')
|
|
if at_link:
|
|
response_acute = requests.get(at_link)
|
|
if response_acute.status_code == 200:
|
|
result['acute_toxicity'] = parse_toxicology_html(response_acute.content)
|
|
|
|
# Fetch and parse repeated dose toxicity
|
|
rdt_link = index.get('repeated_dose_toxicity_link')
|
|
if rdt_link:
|
|
response_repeated = requests.get(rdt_link)
|
|
if response_repeated.status_code == 200:
|
|
result['repeated_dose_toxicity'] = parse_toxicology_html(response_repeated.content)
|
|
|
|
for key, value in result.items():
|
|
if value is None or value == "" or value == [] or value == {}:
|
|
log.warning(f"Missing data for key: {key} in CAS {cas}")
|
|
else:
|
|
log.info(f"Data retrieved for key: {key} in CAS {cas}")
|
|
return result
|
|
|
|
def cas_validation(cas: str) -> str:
|
|
log.info(f"Starting ECHA data extraction for CAS: {cas}")
|
|
if cas is None or cas.strip() == "":
|
|
log.error("No CAS number provided.")
|
|
return None
|
|
|
|
cas_stripped = cas.replace("-", "")
|
|
if cas_stripped.isdigit() and len(cas_stripped) <= 12:
|
|
log.info(f"CAS number {cas} maybe is valid.")
|
|
return cas.strip()
|
|
else:
|
|
log.error(f"CAS number {cas} is not valid.")
|
|
return None
|
|
|
|
def check_local(cas: str) -> bool:
|
|
collection = db_connect()
|
|
|
|
if collection is None:
|
|
log.error("No MongoDB collection available.")
|
|
return None
|
|
|
|
record = collection.find_one({"substance.rmlCas": cas})
|
|
|
|
if record:
|
|
log.info(f"Record for CAS {cas} found in local database.")
|
|
return record
|
|
else:
|
|
log.info(f"No record for CAS {cas} found in local database.")
|
|
return None
|
|
|
|
def add_to_local(data: dict) -> bool:
|
|
collection = db_connect()
|
|
|
|
if collection is None:
|
|
log.error("No MongoDB collection available.")
|
|
return False
|
|
|
|
try:
|
|
collection.insert_one(data)
|
|
log.info(f"Data for CAS {data['substance']['rmlCas']} added to local database.")
|
|
return True
|
|
except Exception as e:
|
|
log.error(f"Error inserting data into MongoDB: {e}")
|
|
return False
|
|
|
|
def orchestrator(cas: str) -> dict:
|
|
log.debug(f"Initiating search for CAS {cas} in ECHA service.")
|
|
cas_validated = cas_validation(cas)
|
|
if not cas_validated:
|
|
return None
|
|
else:
|
|
log.info(f"CAS {cas} validated successfully.")
|
|
local_record = check_local(cas_validated)
|
|
if local_record:
|
|
log.info(f"Returning local record for CAS {cas}.")
|
|
return local_record
|
|
else:
|
|
log.info(f"No local record, starting echa flow")
|
|
echa_data = echa_flow(cas_validated)
|
|
if echa_data:
|
|
log.info(f"Echa flow successful")
|
|
add_to_local(echa_data)
|
|
return echa_data
|
|
else:
|
|
log.error(f"Failed to retrieve ECHA data for CAS {cas}.")
|
|
return None
|
|
|
|
# to do: check if document is complete
|
|
# to do: check lastupdate
|
|
#endregion
|
|
|
|
if __name__ == "__main__":
|
|
cas_test = "50-00-0"
|
|
result = orchestrator(cas_test)
|
|
print(result) |