import marimo __generated_with = "0.16.5" app = marimo.App(width="medium") @app.cell def _(): import marimo as mo import urllib.parse import re as standardre import json from bs4 import BeautifulSoup import requests return BeautifulSoup, mo, requests, urllib @app.cell def _(): from pif_compiler.services.common_log import get_logger log = get_logger() return (log,) @app.cell def _(log): log.info("testing with marimo") return @app.cell def _(): cas_test = "100-41-4" return (cas_test,) @app.cell def _(cas_test, urllib): urllib.parse.quote(cas_test) return @app.cell def _(): BASE_SEARCH = "https://chem.echa.europa.eu/api-substance/v1/substance?pageIndex=1&pageSize=100&searchText=" BASE_DOSSIER = "https://chem.echa.europa.eu/api-dossier-list/v1/dossier?pageIndex=1&pageSize=100&rmlId=" SUBSTANCE_SUMMARY = "https://chem.echa.europa.eu/api-substance/v1/substance/" #+id CLASSIFICATION_ID = "https://chem.echa.europa.eu/api-cnl-inventory/prominent/overview/classifications/harmonised/459160" TOXICOLOGICAL_INFO = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-5f55d8ec-7a71-4e2c-9955-8469ead9fe84_0035f3f8-7467-4944-9028-1db2e9c99565.html" # external + rootkey REPEATED_DOSE = "https://chem.echa.europa.eu/html-pages-prod/e4c88c6e-06c7-4daa-b0fb-1a55459ac22f/documents/IUC5-82402b09-8d8f-495c-b673-95b205be60e0_0035f3f8-7467-4944-9028-1db2e9c99565.html" active = "®istrationStatuses=Active" inactive = "®istrationStatuses=Inactive" legislation = "&legislation=REACH" return BASE_DOSSIER, BASE_SEARCH, active, legislation @app.cell def _(BASE_SEARCH, cas_test, requests): test_search_request = requests.get(BASE_SEARCH + cas_test) return (test_search_request,) @app.cell def _(test_search_request): response = test_search_request.json() return (response,) @app.cell def _(test_search_request): test_search_request.json() return @app.cell def _(cas_test, response): substance = {} for result in response['items']: if result["substanceIndex"]["rmlCas"] == cas_test: substance["rmlCas"] = result["substanceIndex"]["rmlCas"] substance["rmlId"] = result["substanceIndex"]["rmlId"] substance["rmlEc"] = result["substanceIndex"]["rmlEc"] substance["rmlName"] = result["substanceIndex"]["rmlName"] substance["rmlId"] = result["substanceIndex"]["rmlId"] return (substance,) @app.cell def _(substance): substance return @app.cell def _(BASE_DOSSIER, active, substance): url = BASE_DOSSIER + substance['rmlId'] + active url return @app.cell def _(BASE_DOSSIER, active, legislation, requests, substance): response_dossier = requests.get(BASE_DOSSIER + substance['rmlId'] + active + legislation) return (response_dossier,) @app.cell def _(response_dossier): response_dossier_json = response_dossier.json() response_dossier_json return (response_dossier_json,) @app.cell def _(response_dossier_json, substance): substance['lastUpdatedDate'] = response_dossier_json['items'][0]['lastUpdatedDate'] substance['registrationStatus'] = response_dossier_json['items'][0]['registrationStatus'] substance['registrationStatusChangedDate'] = response_dossier_json['items'][0]['registrationStatusChangedDate'] substance['registrationRole'] = response_dossier_json['items'][0]['reachDossierInfo']['registrationRole'] substance['assetExternalId'] = response_dossier_json['items'][0]['assetExternalId'] substance['rootKey'] = response_dossier_json['items'][0]['rootKey'] substance return @app.cell def _(): from pif_compiler.services.db_utils import get_client client = get_client() db = client.get_database(name="toxinfo") return (db,) @app.cell def _(db): collection = db.get_collection("substance_index") list = db.list_collection_names() print(list) return (collection,) @app.cell def _(cas_test, collection, substance): sub = collection.find_one({"rmlCas": cas_test}) if not sub: collection.insert_one(substance) return @app.cell def _(assetExternalId): INDEX_HTML = "https://chem.echa.europa.eu/html-pages/" + assetExternalId + "/index.html" return @app.cell def _(BASE_SEARCH, log, requests): def search_substance(cas : str) -> dict: response = requests.get(BASE_SEARCH + cas) if response.status_code != 200: log.error(f"Network error: {response.status_code}") return {} else: response = response.json() if response['state']['totalItems'] == 0: log.info(f"No substance found for CAS {cas}") return {} else: for result in response['items']: if result["substanceIndex"]["rmlCas"] == cas: substance = { "rmlCas": result["substanceIndex"]["rmlCas"], "rmlId": result["substanceIndex"]["rmlId"], "rmlEc": result["substanceIndex"]["rmlEc"], "rmlName": result["substanceIndex"]["rmlName"], "rmlId": result["substanceIndex"]["rmlId"] } return substance log.error(f"Something went wrong") return {} return (search_substance,) @app.cell def _(BASE_DOSSIER, active, legislation, log, requests): def get_dossier_info(rmlId: str) -> dict: url = BASE_DOSSIER + rmlId + active + legislation response_dossier = requests.get(url) if response_dossier.status_code != 200: log.error(f"Network error: {response_dossier.status_code}") return {} response_dossier_json = response_dossier.json() if response_dossier_json['state']['totalItems'] == 0: log.info(f"No dossier found for RML ID {rmlId}") return {} dossier_info = { "lastUpdatedDate": response_dossier_json['items'][0]['lastUpdatedDate'], "registrationStatus": response_dossier_json['items'][0]['registrationStatus'], "registrationStatusChangedDate": response_dossier_json['items'][0]['registrationStatusChangedDate'], "registrationRole": response_dossier_json['items'][0]['reachDossierInfo']['registrationRole'], "assetExternalId": response_dossier_json['items'][0]['assetExternalId'], "rootKey": response_dossier_json['items'][0]['rootKey'] } return dossier_info return (get_dossier_info,) @app.cell def _(BeautifulSoup, log, requests): def get_substance_index(assetExternalId : str) -> dict: INDEX = "https://chem.echa.europa.eu/html-pages-prod/" + assetExternalId LINK_DOSSIER = INDEX + "/documents/" response = requests.get(INDEX + "/index.html") if response.status_code != 200: log.error(f"Network error: {response.status_code}") return {} soup = BeautifulSoup(response.content, 'html.parser') index_data = {} # Toxicological information : txi txi_div = soup.find('div', id='id_7_Toxicologicalinformation') txi_link = txi_div.find('a', class_='das-leaf') txi_href = txi_link['href'] index_data['toxicological_information_link'] = LINK_DOSSIER + txi_href + '.html' # Repeated dose toxicity : rdt rdt_div = soup.find('div', id='id_75_Repeateddosetoxicity') rdt_link = rdt_div.find('a', class_='das-leaf') rdt_href = rdt_link['href'] index_data['repeated_dose_toxicity_link'] = LINK_DOSSIER + rdt_href + '.html' # Acute toxicity : at at_div = soup.find('div', id='id_72_AcuteToxicity') at_link = at_div.find('a', class_='das-leaf') at_href = at_link['href'] index_data['acute_toxicity_link'] = LINK_DOSSIER + at_href + '.html' return index_data get_substance_index("e4c88c6e-06c7-4daa-b0fb-1a55459ac22f") return (get_substance_index,) @app.cell def _(search_substance): val = search_substance("100-41-4") return (val,) @app.cell def _(val): val return @app.cell def _(get_dossier_info, val): info_dossier = get_dossier_info(val['rmlId']) return (info_dossier,) @app.cell def _(info_dossier): info_dossier return @app.cell def _(get_substance_index, info_dossier): index = get_substance_index(info_dossier['assetExternalId']) index return (index,) @app.cell def _(index, requests): summary_link = index['toxicological_information_link'] response_summary = requests.get(summary_link) return (response_summary,) @app.cell def _(index, requests): acute_link = index['acute_toxicity_link'] response_acute = requests.get(acute_link) return (response_acute,) @app.cell def _(index, requests): repeated_link = index['repeated_dose_toxicity_link'] response_repeated = requests.get(repeated_link) return (response_repeated,) @app.cell def _(BeautifulSoup, response_summary): soup_summary = BeautifulSoup(response_summary.content, 'html.parser') soup_summary.prettify(formatter='html') soup_summary return @app.cell def _(BeautifulSoup, re): def get_field_name(field_div): """Extract field name from the class attribute of label div""" label_div = field_div.find('div', class_='das-field_label') if not label_div: return None classes = label_div.get('class', []) for cls in classes: if cls not in ['das-field_label', 'das-empty-value', 'das-empty-label']: return cls return None def extract_field_value(field_div): """Extract value from a das-field div""" field_name = get_field_name(field_div) if not field_name: return None # Skip OriginalStudy fields if field_name == 'OriginalStudy': return None value_div = field_div.find('div', class_='das-field_value') if not value_div: return None # Exclude redacted/not publishable redacted = value_div.find('span', class_='das-redacted-value') if redacted: return None # Check if empty empty_span = value_div.find('span', class_='das-empty-value') if empty_span and not value_div.find('span', class_='das-redacted-value'): return {field_name: ""} # Extract pick-list value pick_list = value_div.find('span', class_='das-field_value_pick-list') if pick_list: phrase = pick_list.find('span', class_='phrase') if phrase: return {field_name: phrase.get_text(strip=True)} if pick_list.find('span', class_='das-empty-value'): return {field_name: ""} # Extract quantity value (value + unit) quantity = value_div.find('span', class_='i6PhysicalQuantity') if quantity: value_span = quantity.find('span', class_='value') unit_span = quantity.find('span', class_='unit') value_text = value_span.get_text(strip=True) if value_span else "" unit_text = "" if unit_span: unit_phrase = unit_span.find('span', class_='phrase') if unit_phrase: unit_text = unit_phrase.get_text(strip=True) elif unit_span.find('span', class_='das-empty-value'): unit_text = "" if value_text: return {field_name: {"value": value_text, "unit": unit_text}} else: return {field_name: ""} # Extract checkbox value checkbox_checked = value_div.find('span', class_='das-value_checkbox-checked') checkbox_unchecked = value_div.find('span', class_='das-value_checkbox-unchecked') if checkbox_checked is not None or checkbox_unchecked is not None: return {field_name: checkbox_checked is not None} # Extract decimal/numeric value if 'das-field_decimal' in field_div.get('class', []) or 'das-field_text' in field_div.get('class', []): text = value_div.get_text(strip=True) if '[Empty]' in text or not text: return {field_name: ""} return {field_name: text} # Extract HTML/text content if value_div.find('div', class_='das-field_value_html'): html_content = value_div.find('div', class_='das-field_value_html') text = html_content.get_text(separator=' ', strip=True) text = re.sub(r'\[Empty\]', '', text).strip() if not text: return {field_name: ""} return {field_name: text} # Default: get text content text = value_div.get_text(strip=True) text = re.sub(r'\[Empty\]', '', text).strip() return {field_name: text if text else ""} def extract_table_data(table): """Extract table data as array of objects""" rows = table.find_all('tr') if len(rows) < 2: return [] header_row = rows[0] headers = [] for th in header_row.find_all('td'): header_text = th.get_text(strip=True) headers.append(header_text) data = [] for row in rows[1:]: cells = row.find_all('td') if len(cells) == 1 and cells[0].get('colspan'): continue if len(cells) == len(headers): row_data = {} for i, cell in enumerate(cells): cell_text = cell.get_text(strip=True) row_data[headers[i]] = cell_text data.append(row_data) return data def extract_section(section): """Recursively extract data from a section""" section_data = {} label_h3 = section.find('h3', class_='das-block_label', recursive=False) if label_h3: section_data['label'] = label_h3.get_text(strip=True) direct_fields = section.find_all('div', class_='das-field', recursive=False) for field in direct_fields: field_data = extract_field_value(field) if field_data: section_data.update(field_data) tables = section.find_all('table', recursive=False) for i, table in enumerate(tables): table_data = extract_table_data(table) if table_data: table_key = f'table_{i+1}' if len(tables) > 1 else 'table' section_data[table_key] = table_data nested_sections = section.find_all('section', class_='das-block', recursive=False) if nested_sections: section_data['subsections'] = [] for nested in nested_sections: nested_data = extract_section(nested) if nested_data: section_data['subsections'].append(nested_data) return section_data def parse_toxicology_html(html_content): """Main function to parse the toxicological HTML document""" soup = BeautifulSoup(html_content, 'html.parser') result = {} title = soup.find('h4', class_='document-header') if title: result['document_title'] = title.get_text(strip=True) article = soup.find('article', class_='das-document') if not article: return result top_sections = article.find_all('section', class_='das-block', recursive=False) result['sections'] = [] for section in top_sections: section_data = extract_section(section) if section_data: result['sections'].append(section_data) return result return (parse_toxicology_html,) @app.cell def _(): import re return (re,) @app.cell def _(parse_toxicology_html, response_summary): summary_json = parse_toxicology_html(response_summary.content) return (summary_json,) @app.cell def _(summary_json): summary_json return @app.cell def _(parse_toxicology_html, response_acute): acute_json = parse_toxicology_html(response_acute.content) return (acute_json,) @app.cell def _(acute_json): acute_json return @app.cell def _(parse_toxicology_html, response_repeated): response_json = parse_toxicology_html(response_repeated.content) return (response_json,) @app.cell def _(response_json): response_json return @app.cell def _(index): from playwright.sync_api import sync_playwright with sync_playwright() as p: browser = p.chromium.launch() page = browser.new_page() page.goto(index['toxicological_information_link']) page.pdf(path='output.pdf') browser.close() return @app.cell def _( get_dossier_info, get_substance_index, parse_toxicology_html, requests, search_substance, ): def orchestration(cas) -> dict: substance = search_substance(cas) if not substance: return {} dossier_info = get_dossier_info(substance['rmlId']) if not dossier_info: return {} index = get_substance_index(dossier_info['assetExternalId']) if not index: return {} result = { "substance": substance, "dossier_info": dossier_info, "index": index, "toxicological_information": {}, "acute_toxicity": {}, "repeated_dose_toxicity": {} } # Fetch and parse toxicological information txi_link = index.get('toxicological_information_link') if txi_link: response_summary = requests.get(txi_link) if response_summary.status_code == 200: result['toxicological_information'] = parse_toxicology_html(response_summary.content) # Fetch and parse acute toxicity at_link = index.get('acute_toxicity_link') if at_link: response_acute = requests.get(at_link) if response_acute.status_code == 200: result['acute_toxicity'] = parse_toxicology_html(response_acute.content) # Fetch and parse repeated dose toxicity rdt_link = index.get('repeated_dose_toxicity_link') if rdt_link: response_repeated = requests.get(rdt_link) if response_repeated.status_code == 200: result['repeated_dose_toxicity'] = parse_toxicology_html(response_repeated.content) return result return app._unparsable_cell( r""" def check_sub_locally(cas: str) -> dict: client = get_client() db = client.get_database(name=\"toxinfo\") collection = db.get_collection(\"substance_index\") sub = collection.find_one({\"rmlCas\": cas}) if sub: return sub return {}) def add_sub_locally(cas : str) -> None: client = get_client() db = client.get_database(name=\"toxinfo\") collection = db.get_collection(\"substance_index\") sub = collection.find_one({\"rmlCas\": substance['rmlCas']}) if not sub: collection.insert_one(substance) else: return sub """, name="_" ) @app.cell(hide_code=True) def _(mo): mo.md( r""" # Cosa manca da fare 1. Creare un nuovo orchestratore per la parte search, caching in mongodb e creare un metodo unico per la ricerca 2. Metodo per validare i json salvati nel database, verificare la data 3. Creare i metodi per astrarre gli html in json 4. Creare i test per ciascuna funzione 5. Creare la documentazione per ciascuna funzione """ ) return if __name__ == "__main__": app.run()