minor adjustments

This commit is contained in:
adish-rmr 2025-12-01 19:04:09 +01:00
parent d588c7cc2f
commit 5fd12cb7a7
8 changed files with 409 additions and 65 deletions

View file

@ -31,7 +31,17 @@ def _(log):
@app.cell
def _():
cas_test = "100-41-4"
cas_problematici = [
"25525-21-7",
"113170-55-1",
"26172-55-4"
]
return (cas_problematici,)
@app.cell
def _(cas_problematici):
cas_test = cas_problematici[2]
return (cas_test,)
@ -53,7 +63,7 @@ def _():
active = "&registrationStatuses=Active"
inactive = "&registrationStatuses=Inactive"
legislation = "&legislation=REACH"
return BASE_DOSSIER, BASE_SEARCH, active, legislation
return BASE_DOSSIER, BASE_SEARCH, active, inactive, legislation
@app.cell
@ -102,8 +112,8 @@ def _(BASE_DOSSIER, active, substance):
@app.cell
def _(BASE_DOSSIER, active, legislation, requests, substance):
response_dossier = requests.get(BASE_DOSSIER + substance['rmlId'] + active + legislation)
def _(BASE_DOSSIER, inactive, legislation, requests, substance):
response_dossier = requests.get(BASE_DOSSIER + substance['rmlId'] + inactive + legislation)
return (response_dossier,)
@ -152,6 +162,12 @@ def _(cas_test, collection, substance):
return
@app.cell
def _(substance):
assetExternalId = substance['assetExternalId']
return (assetExternalId,)
@app.cell
def _(assetExternalId):
INDEX_HTML = "https://chem.echa.europa.eu/html-pages/" + assetExternalId + "/index.html"
@ -187,9 +203,9 @@ def _(BASE_SEARCH, log, requests):
@app.cell
def _(BASE_DOSSIER, active, legislation, log, requests):
def get_dossier_info(rmlId: str) -> dict:
url = BASE_DOSSIER + rmlId + active + legislation
def _(BASE_DOSSIER, active, inactive, legislation, log, requests):
def get_dossier_info(rmlId: str, type = active) -> dict:
url = BASE_DOSSIER + rmlId + type + legislation
response_dossier = requests.get(url)
if response_dossier.status_code != 200:
log.error(f"Network error: {response_dossier.status_code}")
@ -197,7 +213,7 @@ def _(BASE_DOSSIER, active, legislation, log, requests):
response_dossier_json = response_dossier.json()
if response_dossier_json['state']['totalItems'] == 0:
log.info(f"No dossier found for RML ID {rmlId}")
return {}
return get_dossier_info(rmlId, inactive)
dossier_info = {
"lastUpdatedDate": response_dossier_json['items'][0]['lastUpdatedDate'],
"registrationStatus": response_dossier_json['items'][0]['registrationStatus'],
@ -210,6 +226,11 @@ def _(BASE_DOSSIER, active, legislation, log, requests):
return (get_dossier_info,)
@app.cell
def _():
return
@app.cell
def _(BeautifulSoup, log, requests):
def get_substance_index(assetExternalId : str) -> dict:
@ -254,7 +275,6 @@ def _(BeautifulSoup, log, requests):
@app.cell
def _(search_substance):
val = search_substance("100-41-4")
return (val,)
@ -277,8 +297,8 @@ def _(info_dossier):
@app.cell
def _(get_substance_index, info_dossier):
index = get_substance_index(info_dossier['assetExternalId'])
def _(assetExternalId, get_substance_index):
index = get_substance_index(assetExternalId)
index
return (index,)

View file

@ -1,6 +1,6 @@
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from typing import Optional, Dict, Any, List
from pif_compiler.services.srv_echa import orchestrator
from pif_compiler.functions.common_log import get_logger
@ -28,6 +28,25 @@ class EchaResponse(BaseModel):
error: Optional[str] = None
class EchaBatchRequest(BaseModel):
cas_list: List[str] = Field(..., description="List of CAS numbers to search for")
class Config:
json_schema_extra = {
"example": {
"cas_list": ["50-00-0", "64-17-5", "67-56-1"]
}
}
class EchaBatchResponse(BaseModel):
success: bool
total: int
successful: int
failed: int
results: List[EchaResponse]
@router.post("/echa/search", response_model=EchaResponse, tags=["ECHA"])
async def search_echa_substance(request: EchaRequest):
"""
@ -84,6 +103,77 @@ async def search_echa_substance(request: EchaRequest):
)
@router.post("/echa/batch-search", response_model=EchaBatchResponse, tags=["ECHA"])
async def batch_search_echa_substances(request: EchaBatchRequest):
"""
Search for multiple substances in ECHA database.
This endpoint processes multiple CAS numbers in a single request.
Each CAS number is processed independently using the same orchestration
process as the single search endpoint.
Args:
request: EchaBatchRequest containing a list of CAS numbers
Returns:
EchaBatchResponse with results for all CAS numbers, including
success/failure counts and individual results
"""
logger.info(f"Batch API request received for {len(request.cas_list)} CAS numbers")
results = []
successful = 0
failed = 0
for cas in request.cas_list:
try:
logger.info(f"Processing CAS: {cas}")
result = orchestrator(cas)
if result is None:
logger.warning(f"No data found for CAS: {cas}")
results.append(EchaResponse(
success=False,
cas=cas,
data=None,
error="No data found for the provided CAS number. The CAS may be invalid or not registered in ECHA."
))
failed += 1
else:
# Remove MongoDB _id field if present
if "_id" in result:
del result["_id"]
logger.info(f"Successfully retrieved data for CAS: {cas}")
results.append(EchaResponse(
success=True,
cas=cas,
data=result,
error=None
))
successful += 1
except Exception as e:
logger.error(f"Error processing CAS {cas}: {str(e)}", exc_info=True)
results.append(EchaResponse(
success=False,
cas=cas,
data=None,
error=f"Internal error: {str(e)}"
))
failed += 1
logger.info(f"Batch request completed: {successful} successful, {failed} failed out of {len(request.cas_list)} total")
return EchaBatchResponse(
success=True,
total=len(request.cas_list),
successful=successful,
failed=failed,
results=results
)
@router.get("/echa/health", tags=["ECHA"])
async def echa_health_check():
"""
@ -99,4 +189,4 @@ async def echa_health_check():
"scraper": "operational",
"parser": "operational"
}
}
}

View file

@ -0,0 +1,243 @@
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List, Literal
from pif_compiler.services.srv_cosing import cosing_search, clean_cosing
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
router = APIRouter()
class CosingSearchRequest(BaseModel):
text: str = Field(..., description="Text to search for (name, CAS, EC, or substance ID)")
mode: Literal["name", "cas", "ec", "id"] = Field(
default="name",
description="Search mode: 'name' for INCI/chemical names, 'cas' for CAS numbers, 'ec' for EC numbers, 'id' for substance ID"
)
full: bool = Field(
default=True,
description="If True, includes identified ingredients in the response"
)
class Config:
json_schema_extra = {
"example": {
"text": "Water",
"mode": "name",
"full": True
}
}
class CosingSearchResponse(BaseModel):
success: bool
query: str
mode: str
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class CosingBatchRequest(BaseModel):
searches: List[Dict[str, Any]] = Field(
...,
description="List of search queries, each with 'text' and optionally 'mode' and 'full'"
)
class Config:
json_schema_extra = {
"example": {
"searches": [
{"text": "Water", "mode": "name"},
{"text": "7732-18-5", "mode": "cas"},
{"text": "231-791-2", "mode": "ec"}
]
}
}
class CosingBatchResponse(BaseModel):
success: bool
total: int
successful: int
failed: int
results: List[CosingSearchResponse]
@router.post("/cosing/search", response_model=CosingSearchResponse, tags=["COSING"])
async def search_cosing_substance(request: CosingSearchRequest):
"""
Search for substance information in COSING database.
This endpoint allows searching the COSING database using different search modes:
- **name**: Search by INCI name, chemical name, INN name, Ph.Eur name, etc.
- **cas**: Search by CAS number
- **ec**: Search by EC number
- **id**: Search by substance ID
The response is automatically cleaned and formatted for easier consumption.
If `full=True`, identified ingredients are recursively fetched and included.
Args:
request: CosingSearchRequest with search text, mode, and full flag
Returns:
CosingSearchResponse with the cleaned substance data or error information
"""
logger.info(f"API request received for COSING search: text='{request.text}', mode='{request.mode}', full={request.full}")
try:
# Execute the search
result = cosing_search(request.text, request.mode)
if result is None:
logger.warning(f"No data found for text='{request.text}', mode='{request.mode}'")
return CosingSearchResponse(
success=False,
query=request.text,
mode=request.mode,
data=None,
error=f"No data found for the provided search query. The {request.mode} may be invalid or not found in COSING."
)
# Clean the result
cleaned_result = clean_cosing(result, full=request.full)
logger.info(f"Successfully retrieved and cleaned data for text='{request.text}', mode='{request.mode}'")
return CosingSearchResponse(
success=True,
query=request.text,
mode=request.mode,
data=cleaned_result,
error=None
)
except ValueError as e:
logger.error(f"Validation error for request: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Error processing COSING request for text='{request.text}': {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Internal error while processing COSING search: {str(e)}"
)
@router.post("/cosing/batch-search", response_model=CosingBatchResponse, tags=["COSING"])
async def batch_search_cosing_substances(request: CosingBatchRequest):
"""
Search for multiple substances in COSING database.
This endpoint processes multiple search queries in a single request.
Each search can have its own mode (name, cas, ec, id) and full flag.
Args:
request: CosingBatchRequest containing a list of search queries
Returns:
CosingBatchResponse with results for all searches, including
success/failure counts and individual results
"""
logger.info(f"Batch API request received for {len(request.searches)} COSING searches")
results = []
successful = 0
failed = 0
for search_item in request.searches:
text = search_item.get("text")
mode = search_item.get("mode", "name")
full = search_item.get("full", True)
if not text:
logger.warning("Batch item missing 'text' field, skipping")
results.append(CosingSearchResponse(
success=False,
query="",
mode=mode,
data=None,
error="Missing 'text' field in search item"
))
failed += 1
continue
try:
logger.info(f"Processing COSING search: text='{text}', mode='{mode}'")
result = cosing_search(text, mode)
if result is None:
logger.warning(f"No data found for text='{text}', mode='{mode}'")
results.append(CosingSearchResponse(
success=False,
query=text,
mode=mode,
data=None,
error=f"No data found for the provided search query."
))
failed += 1
else:
# Clean the result
cleaned_result = clean_cosing(result, full=full)
logger.info(f"Successfully retrieved data for text='{text}', mode='{mode}'")
results.append(CosingSearchResponse(
success=True,
query=text,
mode=mode,
data=cleaned_result,
error=None
))
successful += 1
except ValueError as e:
logger.error(f"Validation error for text='{text}': {str(e)}")
results.append(CosingSearchResponse(
success=False,
query=text,
mode=mode,
data=None,
error=f"Validation error: {str(e)}"
))
failed += 1
except Exception as e:
logger.error(f"Error processing text '{text}': {str(e)}", exc_info=True)
results.append(CosingSearchResponse(
success=False,
query=text,
mode=mode,
data=None,
error=f"Internal error: {str(e)}"
))
failed += 1
logger.info(f"Batch request completed: {successful} successful, {failed} failed out of {len(request.searches)} total")
return CosingBatchResponse(
success=True,
total=len(request.searches),
successful=successful,
failed=failed,
results=results
)
@router.get("/cosing/health", tags=["COSING"])
async def cosing_health_check():
"""
Health check endpoint for COSING service.
Returns the status of the COSING service components.
"""
return {
"status": "healthy",
"service": "cosing-search",
"components": {
"api": "operational",
"search": "operational",
"parser": "operational"
}
}

View file

@ -16,7 +16,7 @@ Modules:
# ECHA Services
# COSING Service
from pif_compiler.services.cosing_service import (
from pif_compiler.services.srv_cosing import (
cosing_search,
clean_cosing,
parse_cas_numbers,

View file

@ -1,25 +1,14 @@
import os
from contextlib import contextmanager
import pubchempy as pcp
from pubchemprops.pubchemprops import get_second_layer_props
import logging
logging.basicConfig(
format="{asctime} - {levelname} - {message}",
style="{",
datefmt="%Y-%m-%d %H:%M",
filename="echa.log",
encoding="utf-8",
filemode="a",
level=logging.INFO,
)
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
@contextmanager
def temporary_certificate(cert_path):
# Sto robo serve perchè per usare l'API di PubChem serve cambiare temporaneamente il certificato con il quale
# si fanno le richieste
"""
Context manager to temporarily change the certificate used for requests.
@ -112,38 +101,38 @@ def pubchem_dap(cas):
'''
with temporary_certificate('src/data/ncbi-nlm-nih-gov-catena.pem'):
try:
# Ricerca iniziale
out = pcp.get_synonyms(cas, 'name')
if out:
out = out[0]
output = {'CID' : out['CID'],
'CAS' : cas,
'first_pubchem_name' : out['Synonym'][0],
'pubchem_link' : f"https://pubchem.ncbi.nlm.nih.gov/compound/{out['CID']}"}
else:
return f'No results on PubChem for {cas}'
try:
# Ricerca iniziale
out = pcp.get_synonyms(cas, 'name')
if out:
out = out[0]
output = {'CID' : out['CID'],
'CAS' : cas,
'first_pubchem_name' : out['Synonym'][0],
'pubchem_link' : f"https://pubchem.ncbi.nlm.nih.gov/compound/{out['CID']}"}
else:
return f'No results on PubChem for {cas}'
except Exception as E:
logging.error(f'various_utils.pubchem.pubchem_dap(). Some error during pubchem search for {cas}', exc_info=True)
except Exception as E:
logger.error(f'various_utils.pubchem.pubchem_dap(). Some error during pubchem search for {cas}', exc_info=True)
try:
# Ricerca delle proprietà
properties = pcp.get_properties(['xlogp', 'molecular_weight', 'tpsa', 'exact_mass'], identifier = out['CID'], namespace='cid', searchtype=None, as_dataframe=False)
if properties:
output = {**output, **properties[0]}
else:
return output
except Exception as E:
logging.error(f'various_utils.pubchem.pubchem_dap(). Some error during pubchem first level properties extraction for {cas}', exc_info=True)
try:
# Ricerca del Melting Point
second_layer_props = get_second_layer_props(output['first_pubchem_name'], ['Melting Point', 'Dissociation Constants', 'pH'])
if second_layer_props:
second_layer_props = clean_property_data(second_layer_props)
output = {**output, **second_layer_props}
except Exception as E:
logging.error(f'various_utils.pubchem.pubchem_dap(). Some error during pubchem second level properties extraction (Melting Point) for {cas}', exc_info=True)
return output
try:
# Ricerca delle proprietà
properties = pcp.get_properties(['xlogp', 'molecular_weight', 'tpsa', 'exact_mass'], identifier = out['CID'], namespace='cid', searchtype=None, as_dataframe=False)
if properties:
output = {**output, **properties[0]}
else:
return output
except Exception as E:
logger.error(f'various_utils.pubchem.pubchem_dap(). Some error during pubchem first level properties extraction for {cas}', exc_info=True)
try:
# Ricerca del Melting Point
second_layer_props = get_second_layer_props(output['first_pubchem_name'], ['Melting Point', 'Dissociation Constants', 'pH'])
if second_layer_props:
second_layer_props = clean_property_data(second_layer_props)
output = {**output, **second_layer_props}
except Exception as E:
logger.error(f'various_utils.pubchem.pubchem_dap(). Some error during pubchem second level properties extraction (Melting Point) for {cas}', exc_info=True)
return output

View file

@ -52,8 +52,8 @@ def search_substance(cas : str) -> dict:
return {}
def get_dossier_info(rmlId: str) -> dict:
url = BASE_DOSSIER + rmlId + active + legislation
def get_dossier_info(rmlId: str, type = active) -> dict:
url = BASE_DOSSIER + rmlId + type + legislation
response_dossier = requests.get(url)
if response_dossier.status_code != 200:
log.error(f"Network error: {response_dossier.status_code}")
@ -61,6 +61,8 @@ def get_dossier_info(rmlId: str) -> dict:
response_dossier_json = response_dossier.json()
if response_dossier_json['state']['totalItems'] == 0:
log.info(f"No dossier found for RML ID {rmlId}")
if type == active:
return get_dossier_info(rmlId, inactive)
return {}
dossier_info = {
"lastUpdatedDate": response_dossier_json['items'][0]['lastUpdatedDate'],
@ -438,6 +440,6 @@ def orchestrator(cas: str) -> dict:
#endregion
if __name__ == "__main__":
cas_test = "50-00-0"
cas_test = "113170-55-1"
result = orchestrator(cas_test)
print(result)

View file

@ -9,7 +9,7 @@ Test coverage:
import pytest
from unittest.mock import Mock, patch
from pif_compiler.services.cosing_service import (
from pif_compiler.services.srv_cosing import (
parse_cas_numbers,
cosing_search,
clean_cosing,