cosmoguard-bd/src/pif_compiler/classes/models.py
2026-02-08 14:31:50 +01:00

423 lines
No EOL
15 KiB
Python

from pydantic import BaseModel, Field, field_validator, ConfigDict, model_validator, computed_field
import re
from typing import List, Optional
from datetime import datetime as dt
from pif_compiler.services.srv_echa import extract_levels, at_extractor, rdt_extractor, orchestrator
from pif_compiler.functions.db_utils import postgres_connect
from pif_compiler.services.srv_pubchem import pubchem_dap
from pif_compiler.services.srv_cosing import cosing_entry
class DapInfo(BaseModel):
cas: str
molecular_weight: Optional[float] = Field(default=None, description="In Daltons (Da)")
high_ionization: Optional[float] = Field(default=None, description="High degree of ionization")
log_pow: Optional[float] = Field(default=None, description="Partition coefficient")
tpsa: Optional[float] = Field(default=None, description="Topological polar surface area")
melting_point: Optional[float] = Field(default=None, description="In Celsius (°C)")
# --- Il valore DAP Calcolato ---
# Lo impostiamo di default a 0.5 (50%), verrà sovrascritto dal validator
dap_value: float = 0.5
@model_validator(mode='after')
def compute_dap(self):
# Lista delle condizioni (True se la condizione riduce l'assorbimento)
conditions = []
# 1. MW > 500 Da
if self.molecular_weight is not None:
conditions.append(self.molecular_weight > 500)
# 2. High Ionization (Se è True, riduce l'assorbimento)
if self.high_ionization is not None:
conditions.append(self.high_ionization is True)
# 3. Log Pow <= -1 OR >= 4
if self.log_pow is not None:
conditions.append(self.log_pow <= -1 or self.log_pow >= 4)
# 4. TPSA > 120 Å2
if self.tpsa is not None:
conditions.append(self.tpsa > 120)
# 5. Melting Point > 200°C
if self.melting_point is not None:
conditions.append(self.melting_point > 200)
# LOGICA FINALE:
# Se c'è almeno una condizione "sicura" (True), il DAP è 0.1
if any(conditions):
self.dap_value = 0.1
else:
self.dap_value = 0.5
return self
@classmethod
def dap_builder(cls, dap_data: dict):
"""
Costruisce un oggetto DapInfo a partire dai dati grezzi.
"""
desiderated_keys = ['CAS', 'MolecularWeight', 'XLogP', 'TPSA', 'Melting Point', 'Dissociation Constants']
actual_keys = [key for key in dap_data.keys() if key in desiderated_keys]
dict = {}
for key in actual_keys:
if key == 'CAS':
dict['cas'] = dap_data[key]
if key == 'MolecularWeight':
mw = float(dap_data[key])
dict['molecular_weight'] = mw
if key == 'XLogP':
log_pow = float(dap_data[key])
dict['log_pow'] = log_pow
if key == 'TPSA':
tpsa = float(dap_data[key])
dict['tpsa'] = tpsa
if key == 'Melting Point':
try:
for item in dap_data[key]:
if '°C' in item['Value']:
mp = dap_data[key]['Value']
mp_value = re.findall(r"[-+]?\d*\.\d+|\d+", mp)
if mp_value:
dict['melting_point'] = float(mp_value[0])
except:
continue
if key == 'Dissociation Constants':
try:
for item in dap_data[key]:
if 'pKa' in item['Value']:
pk = dap_data[key]['Value']
pk_value = re.findall(r"[-+]?\d*\.\d+|\d+", pk)
if pk_value:
dict['high_ionization'] = float(mp_value[0])
except:
continue
return cls(**dict)
class CosingInfo(BaseModel):
cas : List[str] = Field(default_factory=list)
common_names : List[str] = Field(default_factory=list)
inci : List[str] = Field(default_factory=list)
annex : List[str] = Field(default_factory=list)
functionName : List[str] = Field(default_factory=list)
otherRestrictions : List[str] = Field(default_factory=list)
cosmeticRestriction : Optional[str]
@classmethod
def cosing_builder(cls, cosing_data : dict):
cosing_keys = ['nameOfCommonIngredientsGlossary', 'casNo', 'functionName', 'annexNo', 'refNo', 'otherRestrictions', 'cosmeticRestriction', 'inciName']
keys = [k for k in cosing_data.keys() if k in cosing_keys]
cosing_dict = {}
for k in keys:
if k == 'nameOfCommonIngredientsGlossary':
names = []
for name in cosing_data[k]:
names.append(name)
cosing_dict['common_names'] = names
if k == 'inciName':
inci = []
for inc in cosing_data[k]:
inci.append(inc)
cosing_dict['inci'] = inci
if k == 'casNo':
cas_list = []
for casNo in cosing_data[k]:
cas_list.append(casNo)
cosing_dict['cas'] = cas_list
if k == 'functionName':
functions = []
for func in cosing_data[k]:
functions.append(func)
cosing_dict['functionName'] = functions
if k == 'annexNo':
annexes = []
i = 0
for ann in cosing_data[k]:
restriction = ann + ' / ' + cosing_data['refNo'][i]
annexes.append(restriction)
i = i+1
cosing_dict['annex'] = annexes
if k == 'otherRestrictions':
other_restrictions = []
for ores in cosing_data[k]:
other_restrictions.append(ores)
cosing_dict['otherRestrictions'] = other_restrictions
if k == 'cosmeticRestriction':
cosing_dict['cosmeticRestriction'] = cosing_data[k]
return cls(**cosing_dict)
@classmethod
def cycle_identified(cls, cosing_data : dict):
cosing_entries = []
if 'identifiedIngredient' in cosing_data.keys():
identified_cosing = cls.cosing_builder(cosing_data['identifiedIngredient'])
cosing_entries.append(identified_cosing)
main = cls.cosing_builder(cosing_data)
cosing_entries.append(main)
return cosing_entries
class ToxIndicator(BaseModel):
indicator : str
value : int
unit : str
route : str
toxicity_type : Optional[str] = None
ref : Optional[str] = None
@property
def priority_rank(self):
"""Returns the numerical priority based on the toxicological indicator."""
mapping = {
'LD50': 1,
'DL50': 1,
'LOAEL': 3,
'NOAEL': 4
}
return mapping.get(self.indicator, -1)
@property
def factor(self):
"""Returns the factor based on the toxicity type."""
if self.priority_rank == 1:
return 10
elif self.priority_rank == 3:
return 3
return 1
class Toxicity(BaseModel):
cas: str
indicators: list[ToxIndicator]
best_case: Optional[ToxIndicator] = None
factor: Optional[int] = None
@model_validator(mode='after')
def set_best_case(self) -> 'Toxicity':
if self.indicators:
self.best_case = max(self.indicators, key=lambda x: x.priority_rank)
self.factor = self.best_case.factor
return self
@classmethod
def from_result(cls, cas: str, result):
toxicity_types = ['repeated_dose_toxicity', 'acute_toxicity']
indicators_list = []
for tt in toxicity_types:
if tt not in result:
continue
try:
extractor = at_extractor if tt == 'acute_toxicity' else rdt_extractor
fetch = extract_levels(result[tt], extractor=extractor)
link = result.get(f"{tt}_link", "")
for key, lvl in fetch.items():
lvl['ref'] = link
elem = ToxIndicator(**lvl)
indicators_list.append(elem)
except Exception as e:
print(f"Errore durante l'estrazione di {tt}: {e}")
continue
return cls(
cas=cas,
indicators=indicators_list
)
class Ingredient(BaseModel):
cas: str
inci: Optional[List[str]] = None
dap_info: Optional[DapInfo] = None
cosing_info: Optional[List[CosingInfo]] = None
toxicity: Optional[Toxicity] = None
creation_date: Optional[str] = None
@classmethod
def ingredient_builder(
cls,
cas: str,
inci: Optional[List[str]] = None,
dap_data: Optional[dict] = None,
cosing_data: Optional[dict] = None,
toxicity_data: Optional[dict] = None):
dap_info = DapInfo.dap_builder(dap_data) if dap_data else None
cosing_info = CosingInfo.cycle_identified(cosing_data) if cosing_data else None
toxicity = Toxicity.from_result(cas, toxicity_data) if toxicity_data else None
return cls(
cas=cas,
inci=inci,
dap_info=dap_info,
cosing_info=cosing_info,
toxicity=toxicity
)
@model_validator(mode='after')
def set_creation_date(self) -> 'Ingredient':
self.creation_date = dt.now().isoformat()
return self
def update_ingredient(self, attr : str, data : dict):
setattr(self, attr, data)
def to_mongo_dict(self):
mongo_dict = self.model_dump()
return mongo_dict
def get_stats(self):
stats = {
"has_dap_info": self.dap_info is not None,
"has_cosing_info": self.cosing_info is not None,
"has_toxicity_info": self.toxicity is not None,
"num_tox_indicators": len(self.toxicity.indicators) if self.toxicity else 0,
"has_best_tox_indicator": self.toxicity.best_case is not None if self.toxicity else False,
"has_restrictions_in_cosing": any(self.cosing_info[0].annex) if self.cosing_info else False,
"has_noael_indicator": any(ind.indicator == 'NOAEL' for ind in self.toxicity.indicators) if self.toxicity else False,
"has_ld50_indicator": any(ind.indicator == 'LD50' for ind in self.toxicity.indicators) if self.toxicity else False,
"has_loael_indicator": any(ind.indicator == 'LOAEL' for ind in self.toxicity.indicators) if self.toxicity else False
}
return stats
def is_old(self, threshold_days: int = 365) -> bool:
if not self.creation_date:
return True
creation_dt = dt.fromisoformat(self.creation_date)
current_dt = dt.now()
delta = current_dt - creation_dt
return delta.days > threshold_days
def add_inci_name(self, inci_name: str):
if self.inci is None:
self.inci = []
if inci_name not in self.inci:
self.inci.append(inci_name)
def return_best_toxicity(self) -> Optional[ToxIndicator]:
if self.toxicity and self.toxicity.best_case:
return self.toxicity.best_case
return None
def return_cosing_restrictions(self) -> List[str]:
restrictions = []
if self.cosing_info:
for cosing in self.cosing_info:
restrictions.extend(cosing.annex)
return restrictions
class RetentionFactors:
LEAVE_ON = 1.0
RINSE_OFF = 0.01
DENTIFRICE = 0.05
MOUTHWASH = 0.10
DYE = 0.10
class Esposition(BaseModel):
preset_name : str
tipo_prodotto: str
popolazione_target: str = "Adulti"
peso_target_kg: float = 60.0
luogo_applicazione: str
esp_normali: List[str]
esp_secondarie: List[str]
esp_nano: List[str]
sup_esposta: int = Field(ge=1, le=17500, description="Area di applicazione in cm2")
freq_applicazione: int = Field(default=1, description="Numero di applicazioni al giorno")
qta_giornaliera: float = Field(..., description="Quantità di prodotto applicata (g/die)")
ritenzione: float = Field(default=1.0, ge=0, le=1.0, description="Fattore di ritenzione")
note: Optional[str] = None
@field_validator('esp_normali', 'esp_secondarie', 'esp_nano', mode='before')
@classmethod
def parse_postgres_array(cls, v):
# Se Postgres restituisce una stringa tipo '{a,b}' la trasformiamo in ['a','b']
if isinstance(v, str):
cleaned = v.strip('{}[]')
return [item.strip() for item in cleaned.split(',')] if cleaned else []
return v
@computed_field
@property
def esposizione_calcolata(self) -> float:
return self.qta_giornaliera * self.ritenzione
@computed_field
@property
def esposizione_relativa(self) -> float:
return (self.esposizione_calcolata * 1000) / self.peso_target_kg
def save_to_postgres(self):
data = self.model_dump(mode='json')
query = """INSERT INTO tipi_prodotti (
preset_name, tipo_prodotto, luogo_applicazione,
esp_normali, esp_secondarie, esp_nano,
sup_esposta, freq_applicazione, qta_giornaliera, ritenzione
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id_preset;"""
conn = postgres_connect()
try:
with conn.cursor() as cur:
cur.execute(query, (
data.get("preset_name"), data.get("tipo_prodotto"),
data.get("luogo_applicazione"), data.get("esp_normali"),
data.get("esp_secondarie"), data.get("esp_nano"),
data.get("sup_esposta"), data.get("freq_applicazione"),
data.get("qta_giornaliera"), data.get("ritenzione")
))
result = cur.fetchone()
conn.commit()
return result[0] if result else None
except Exception as e:
print(f"Errore salvataggio: {e}")
conn.rollback()
return False
finally:
conn.close()
@classmethod
def get_presets(cls):
conn = postgres_connect()
try:
with conn.cursor() as cur:
cur.execute("SELECT preset_name, tipo_prodotto, luogo_applicazione, esp_normali, esp_secondarie, esp_nano, sup_esposta, freq_applicazione, qta_giornaliera, ritenzione FROM tipi_prodotti;")
results = cur.fetchall()
lista_oggetti = []
for r in results:
obj = cls(
preset_name=r[0],
tipo_prodotto=r[1],
luogo_applicazione=r[2],
esp_normali=r[3],
esp_secondarie=r[4],
esp_nano=r[5],
sup_esposta=r[6],
freq_applicazione=r[7],
qta_giornaliera=r[8],
ritenzione=r[9]
)
lista_oggetti.append(obj)
return lista_oggetti
except Exception as e:
print(f"Errore: {e}")
return []
finally:
conn.close()