653 lines
No EOL
25 KiB
Python
653 lines
No EOL
25 KiB
Python
from pydantic import BaseModel, Field, field_validator, ConfigDict, model_validator, computed_field
|
|
import re
|
|
from enum import IntEnum
|
|
from typing import List, Optional
|
|
from datetime import datetime as dt
|
|
|
|
from pif_compiler.functions.common_log import get_logger
|
|
|
|
logger = get_logger()
|
|
|
|
class StatoOrdine(IntEnum):
|
|
"""Stati ordine per orchestrare il flusso di elaborazione PIF."""
|
|
RICEVUTO = 1 # Input grezzo ricevuto, caricato su MongoDB
|
|
VALIDATO = 2 # L'oggetto è stato creato (compilatore, cliente, tipo cosmetico ok)
|
|
COMPILAZIONE = 3 # l'oggetto Progetto è stato creato ed è in fase di arricchimento
|
|
ARRICCHITO = 5 # oggetto progetto è finalizzato
|
|
CALCOLO = 6 # Calcolo DAP, SED, MoS in corso
|
|
COMPLETATO = 8 # PIF finalizzato
|
|
ERRORE = 9 # Errore durante l'elaborazione, intervento umano
|
|
|
|
from pif_compiler.services.srv_echa import extract_levels, at_extractor, rdt_extractor, orchestrator
|
|
from pif_compiler.functions.db_utils import postgres_connect, upsert_ingrediente, get_ingrediente_by_cas
|
|
from pif_compiler.services.srv_pubchem import pubchem_dap
|
|
from pif_compiler.services.srv_cosing import cosing_entry
|
|
|
|
|
|
class DapInfo(BaseModel):
|
|
cas: str
|
|
|
|
molecular_weight: Optional[float] = Field(default=None, description="In Daltons (Da)")
|
|
high_ionization: Optional[float] = Field(default=None, description="High degree of ionization")
|
|
log_pow: Optional[float] = Field(default=None, description="Partition coefficient")
|
|
tpsa: Optional[float] = Field(default=None, description="Topological polar surface area")
|
|
melting_point: Optional[float] = Field(default=None, description="In Celsius (°C)")
|
|
|
|
# --- Il valore DAP Calcolato ---
|
|
# Lo impostiamo di default a 0.5 (50%), verrà sovrascritto dal validator
|
|
dap_value: float = 0.5
|
|
|
|
@model_validator(mode='after')
|
|
def compute_dap(self):
|
|
# Lista delle condizioni (True se la condizione riduce l'assorbimento)
|
|
conditions = []
|
|
|
|
# 1. MW > 500 Da
|
|
if self.molecular_weight is not None:
|
|
conditions.append(self.molecular_weight > 500)
|
|
|
|
# 2. High Ionization (Se è True, riduce l'assorbimento)
|
|
if self.high_ionization is not None:
|
|
conditions.append(self.high_ionization is True)
|
|
|
|
# 3. Log Pow <= -1 OR >= 4
|
|
if self.log_pow is not None:
|
|
conditions.append(self.log_pow <= -1 or self.log_pow >= 4)
|
|
|
|
# 4. TPSA > 120 Å2
|
|
if self.tpsa is not None:
|
|
conditions.append(self.tpsa > 120)
|
|
|
|
# 5. Melting Point > 200°C
|
|
if self.melting_point is not None:
|
|
conditions.append(self.melting_point > 200)
|
|
|
|
# LOGICA FINALE:
|
|
# Se c'è almeno una condizione "sicura" (True), il DAP è 0.1
|
|
if any(conditions):
|
|
self.dap_value = 0.1
|
|
else:
|
|
self.dap_value = 0.5
|
|
|
|
return self
|
|
|
|
@classmethod
|
|
def dap_builder(cls, dap_data: dict):
|
|
"""
|
|
Costruisce un oggetto DapInfo a partire dai dati grezzi.
|
|
"""
|
|
desiderated_keys = ['CAS', 'MolecularWeight', 'XLogP', 'TPSA', 'Melting Point', 'Dissociation Constants']
|
|
actual_keys = [key for key in dap_data.keys() if key in desiderated_keys]
|
|
|
|
dict = {}
|
|
|
|
for key in actual_keys:
|
|
if key == 'CAS':
|
|
dict['cas'] = dap_data[key]
|
|
if key == 'MolecularWeight':
|
|
mw = float(dap_data[key])
|
|
dict['molecular_weight'] = mw
|
|
if key == 'XLogP':
|
|
log_pow = float(dap_data[key])
|
|
dict['log_pow'] = log_pow
|
|
if key == 'TPSA':
|
|
tpsa = float(dap_data[key])
|
|
dict['tpsa'] = tpsa
|
|
if key == 'Melting Point':
|
|
try:
|
|
for item in dap_data[key]:
|
|
if '°C' in item['Value']:
|
|
mp = item['Value']
|
|
mp_value = re.findall(r"[-+]?\d*\.\d+|\d+", mp)
|
|
if mp_value:
|
|
dict['melting_point'] = float(mp_value[0])
|
|
except Exception as e:
|
|
logger.warning(f"DapInfo: parsing melting_point fallito per CAS={dict.get('cas', '?')}: {e}")
|
|
continue
|
|
if key == 'Dissociation Constants':
|
|
try:
|
|
for item in dap_data[key]:
|
|
if 'pKa' in item['Value']:
|
|
pk = item['Value']
|
|
pk_value = re.findall(r"[-+]?\d*\.\d+|\d+", pk)
|
|
if pk_value:
|
|
dict['high_ionization'] = float(pk_value[0])
|
|
except Exception as e:
|
|
logger.warning(f"DapInfo: parsing dissociation fallito per CAS={dict.get('cas', '?')}: {e}")
|
|
continue
|
|
|
|
return cls(**dict)
|
|
|
|
class CosingInfo(BaseModel):
|
|
cas : List[str] = Field(default_factory=list)
|
|
common_names : List[str] = Field(default_factory=list)
|
|
inci : List[str] = Field(default_factory=list)
|
|
annex : List[str] = Field(default_factory=list)
|
|
functionName : List[str] = Field(default_factory=list)
|
|
otherRestrictions : List[str] = Field(default_factory=list)
|
|
cosmeticRestriction : Optional[str] = None
|
|
reference : Optional[str] = None
|
|
substanceId : Optional[str] = None
|
|
sccsOpinionUrls : List[str] = Field(default_factory=list)
|
|
|
|
@classmethod
|
|
def cosing_builder(cls, cosing_data : dict):
|
|
cosing_keys = [
|
|
'nameOfCommonIngredientsGlossary',
|
|
'casNo',
|
|
'functionName',
|
|
'annexNo',
|
|
'refNo',
|
|
'otherRestrictions',
|
|
'cosmeticRestriction',
|
|
'reference',
|
|
'substanceId',
|
|
'inciName',
|
|
'sccsOpinionUrls'
|
|
]
|
|
keys = [k for k in cosing_data.keys() if k in cosing_keys]
|
|
|
|
cosing_dict = {}
|
|
|
|
for k in keys:
|
|
if k == 'nameOfCommonIngredientsGlossary':
|
|
names = []
|
|
for name in cosing_data[k]:
|
|
names.append(name)
|
|
cosing_dict['common_names'] = names
|
|
if k == 'inciName':
|
|
inci = []
|
|
for inc in cosing_data[k]:
|
|
inci.append(inc)
|
|
cosing_dict['inci'] = inci
|
|
if k == 'casNo':
|
|
cas_list = []
|
|
for casNo in cosing_data[k]:
|
|
cas_list.append(casNo)
|
|
cosing_dict['cas'] = cas_list
|
|
if k == 'functionName':
|
|
functions = []
|
|
for func in cosing_data[k]:
|
|
functions.append(func)
|
|
cosing_dict['functionName'] = functions
|
|
if k == 'annexNo':
|
|
annexes = []
|
|
i = 0
|
|
for ann in cosing_data[k]:
|
|
restriction = ann + ' / ' + cosing_data['refNo'][i]
|
|
annexes.append(restriction)
|
|
i = i+1
|
|
cosing_dict['annex'] = annexes
|
|
if k == 'otherRestrictions':
|
|
other_restrictions = []
|
|
for ores in cosing_data[k]:
|
|
other_restrictions.append(ores)
|
|
cosing_dict['otherRestrictions'] = other_restrictions
|
|
if k == 'cosmeticRestriction':
|
|
cosing_dict['cosmeticRestriction'] = cosing_data[k]
|
|
if k == 'reference':
|
|
cosing_dict['reference'] = cosing_data[k]
|
|
if k == 'substanceId':
|
|
cosing_dict['substanceId'] = cosing_data[k]
|
|
if k == 'sccsOpinionUrls':
|
|
urls = []
|
|
for url in cosing_data[k]:
|
|
urls.append(url)
|
|
cosing_dict['sccsOpinionUrls'] = urls
|
|
|
|
return cls(**cosing_dict)
|
|
|
|
@classmethod
|
|
def cycle_identified(cls, cosing_data : dict):
|
|
cosing_entries = []
|
|
if 'identifiedIngredient' in cosing_data.keys():
|
|
for each_entry in cosing_data['identifiedIngredient']:
|
|
identified_cosing = cls.cosing_builder(each_entry)
|
|
cosing_entries.append(identified_cosing)
|
|
main = cls.cosing_builder(cosing_data)
|
|
cosing_entries.append(main)
|
|
|
|
return cosing_entries
|
|
|
|
class ToxIndicator(BaseModel):
|
|
indicator : str
|
|
value : float
|
|
unit : str
|
|
route : str
|
|
toxicity_type : Optional[str] = None
|
|
ref : Optional[str] = None
|
|
source : Optional[str] = None
|
|
is_custom : bool = False
|
|
|
|
@property
|
|
def priority_rank(self):
|
|
"""Returns the numerical priority based on the toxicological indicator."""
|
|
mapping = {
|
|
'LD50': 1,
|
|
'DL50': 1,
|
|
'LOAEL': 3,
|
|
'NOAEL': 4
|
|
}
|
|
return mapping.get(self.indicator, -1)
|
|
|
|
@property
|
|
def factor(self):
|
|
"""Returns the factor based on the toxicity type."""
|
|
if self.priority_rank == 1:
|
|
return 10
|
|
elif self.priority_rank == 3:
|
|
return 3
|
|
return 1
|
|
|
|
class Toxicity(BaseModel):
|
|
cas: str
|
|
indicators: list[ToxIndicator]
|
|
best_case: Optional[ToxIndicator] = None
|
|
factor: Optional[int] = None
|
|
|
|
@model_validator(mode='after')
|
|
def set_best_case(self) -> 'Toxicity':
|
|
if not self.indicators:
|
|
return self
|
|
|
|
# 1. Pick highest priority rank (NOAEL=4 > LOAEL=3 > LD50=1)
|
|
max_rank = max(x.priority_rank for x in self.indicators)
|
|
top = [x for x in self.indicators if x.priority_rank == max_rank]
|
|
|
|
# 2. Tiebreak by route preference: dermal > oral > inhalation > other
|
|
def _route_order(ind: ToxIndicator) -> int:
|
|
r = ind.route.lower()
|
|
if "dermal" in r:
|
|
return 1
|
|
if "oral" in r:
|
|
return 2
|
|
if "inhalation" in r:
|
|
return 3
|
|
return 4
|
|
|
|
best_route = min(_route_order(x) for x in top)
|
|
top = [x for x in top if _route_order(x) == best_route]
|
|
|
|
# 3. For NOAEL/LOAEL, pick the lowest value (most conservative)
|
|
if top[0].indicator in ('NOAEL', 'LOAEL'):
|
|
self.best_case = min(top, key=lambda x: x.value)
|
|
else:
|
|
self.best_case = top[0]
|
|
|
|
self.factor = self.best_case.factor
|
|
return self
|
|
|
|
@classmethod
|
|
def from_result(cls, cas: str, result):
|
|
toxicity_types = ['repeated_dose_toxicity', 'acute_toxicity']
|
|
indicators_list = []
|
|
|
|
for tt in toxicity_types:
|
|
if tt not in result:
|
|
logger.debug(f"Toxicity CAS={cas}: nessun dato per {tt}")
|
|
continue
|
|
|
|
try:
|
|
extractor = at_extractor if tt == 'acute_toxicity' else rdt_extractor
|
|
fetch = extract_levels(result[tt], extractor=extractor)
|
|
|
|
if not fetch:
|
|
logger.warning(f"Toxicity CAS={cas}: {tt} presente ma nessun indicatore estratto")
|
|
continue
|
|
|
|
links = result.get("index")
|
|
link = links.get(f"{tt}_link", "")
|
|
|
|
for key, lvl in fetch.items():
|
|
lvl['ref'] = link
|
|
lvl['source'] = tt
|
|
elem = ToxIndicator(**lvl)
|
|
indicators_list.append(elem)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Toxicity.from_result CAS={cas}: estrazione {tt} fallita: {e}")
|
|
continue
|
|
|
|
return cls(
|
|
cas=cas,
|
|
indicators=indicators_list
|
|
)
|
|
|
|
class Ingredient(BaseModel):
|
|
cas: str
|
|
inci: Optional[List[str]] = None
|
|
dap_info: Optional[DapInfo] = None
|
|
cosing_info: Optional[List[CosingInfo]] = None
|
|
toxicity: Optional[Toxicity] = None
|
|
creation_date: Optional[str] = None
|
|
|
|
@classmethod
|
|
def ingredient_builder(cls, cas: str, inci: Optional[List[str]] = None):
|
|
logger.info(f"ingredient_builder CAS={cas}: inizio scraping")
|
|
|
|
dap_data = pubchem_dap(cas)
|
|
dap_info = DapInfo.dap_builder(dap_data) if isinstance(dap_data, dict) else None
|
|
if not dap_info:
|
|
logger.warning(f"CAS={cas}: nessun dato DAP da PubChem")
|
|
|
|
cosing_data = cosing_entry(cas)
|
|
cosing_info = CosingInfo.cycle_identified(cosing_data) if cosing_data else None
|
|
if not cosing_info:
|
|
logger.warning(f"CAS={cas}: nessun dato COSING")
|
|
|
|
toxicity_data = orchestrator(cas)
|
|
toxicity = Toxicity.from_result(cas, toxicity_data) if toxicity_data else None
|
|
if not toxicity or not toxicity.indicators:
|
|
logger.warning(f"CAS={cas}: nessun indicatore tossicologico trovato")
|
|
|
|
logger.info(f"CAS={cas}: scraping completato (dap={'OK' if dap_info else '-'}, cosing={'OK' if cosing_info else '-'}, tox={len(toxicity.indicators) if toxicity else 0} ind.)")
|
|
|
|
return cls(
|
|
cas=cas,
|
|
inci=inci,
|
|
dap_info=dap_info,
|
|
cosing_info=cosing_info,
|
|
toxicity=toxicity
|
|
)
|
|
|
|
@model_validator(mode='after')
|
|
def set_creation_date(self) -> 'Ingredient':
|
|
if self.creation_date is None:
|
|
self.creation_date = dt.now().isoformat()
|
|
return self
|
|
|
|
def update_ingredient(self, attr : str, data : dict):
|
|
setattr(self, attr, data)
|
|
|
|
def to_mongo_dict(self):
|
|
mongo_dict = self.model_dump()
|
|
return mongo_dict
|
|
|
|
def save(self):
|
|
"""Salva l'ingrediente su MongoDB (collection 'ingredients') e crea/aggiorna la riga in PostgreSQL."""
|
|
from pif_compiler.functions.db_utils import db_connect
|
|
|
|
collection = db_connect(collection_name='ingredients')
|
|
if collection is None:
|
|
logger.error(f"Ingredient.save CAS={self.cas}: connessione MongoDB fallita")
|
|
return None
|
|
|
|
mongo_dict = self.to_mongo_dict()
|
|
|
|
result = collection.replace_one(
|
|
{"cas": self.cas},
|
|
mongo_dict,
|
|
upsert=True
|
|
)
|
|
|
|
if result.upserted_id:
|
|
mongo_id = str(result.upserted_id)
|
|
else:
|
|
doc = collection.find_one({"cas": self.cas}, {"_id": 1})
|
|
mongo_id = str(doc["_id"])
|
|
|
|
has_dap = self.dap_info is not None
|
|
has_cosing = self.cosing_info is not None
|
|
has_tox = self.toxicity is not None
|
|
|
|
upsert_ingrediente(self.cas, mongo_id, dap=has_dap, cosing=has_cosing, tox=has_tox)
|
|
|
|
logger.debug(f"Ingredient.save CAS={self.cas}: mongo_id={mongo_id}")
|
|
return mongo_id
|
|
|
|
@classmethod
|
|
def from_cas(cls, cas: str):
|
|
"""Recupera un ingrediente da MongoDB tramite il CAS, usando PostgreSQL come indice."""
|
|
from pif_compiler.functions.db_utils import db_connect
|
|
from bson import ObjectId
|
|
|
|
pg_entry = get_ingrediente_by_cas(cas)
|
|
if not pg_entry:
|
|
return None
|
|
|
|
_, _, mongo_id, _, _, _ = pg_entry
|
|
if not mongo_id:
|
|
logger.warning(f"from_cas CAS={cas}: presente in PG ma mongo_id è NULL")
|
|
return None
|
|
|
|
collection = db_connect(collection_name='ingredients')
|
|
doc = collection.find_one({"_id": ObjectId(mongo_id)})
|
|
if not doc:
|
|
logger.warning(f"from_cas CAS={cas}: mongo_id={mongo_id} non trovato in MongoDB")
|
|
return None
|
|
|
|
doc.pop("_id", None)
|
|
return cls(**doc)
|
|
|
|
@classmethod
|
|
def get_or_create(cls, cas: str, inci: Optional[List[str]] = None, force: bool = False):
|
|
"""Restituisce l'ingrediente dalla cache se esiste e non è vecchio, altrimenti lo ricrea.
|
|
Se force=True, ignora la cache e riesegue lo scraping aggiornando il documento.
|
|
Al re-scraping, i campi che risultano None vengono sostituiti con il valore cached
|
|
per evitare regressioni di dati in caso di fallimenti temporanei delle fonti esterne."""
|
|
cached = None
|
|
if not force:
|
|
cached = cls.from_cas(cas)
|
|
if cached and not cached.is_old():
|
|
logger.debug(f"get_or_create CAS={cas}: cache hit")
|
|
return cached
|
|
|
|
if cached:
|
|
logger.info(f"get_or_create CAS={cas}: cache scaduta, re-scraping")
|
|
else:
|
|
logger.info(f"get_or_create CAS={cas}: force refresh")
|
|
|
|
ingredient = cls.ingredient_builder(cas, inci=inci)
|
|
|
|
if cached:
|
|
if ingredient.dap_info is None and cached.dap_info is not None:
|
|
logger.warning(f"get_or_create CAS={cas}: dap_info non ottenuto, mantengo dati cached")
|
|
ingredient.dap_info = cached.dap_info
|
|
if ingredient.cosing_info is None and cached.cosing_info is not None:
|
|
logger.warning(f"get_or_create CAS={cas}: cosing_info non ottenuto, mantengo dati cached")
|
|
ingredient.cosing_info = cached.cosing_info
|
|
if ingredient.toxicity is None and cached.toxicity is not None:
|
|
logger.warning(f"get_or_create CAS={cas}: toxicity non ottenuta, mantengo dati cached")
|
|
ingredient.toxicity = cached.toxicity
|
|
elif ingredient.toxicity is not None and cached.toxicity is not None:
|
|
custom_indicators = [i for i in cached.toxicity.indicators if i.is_custom]
|
|
if custom_indicators:
|
|
logger.info(f"get_or_create CAS={cas}: preservo {len(custom_indicators)} indicatori custom nel re-scraping")
|
|
ingredient.toxicity = Toxicity(
|
|
cas=ingredient.toxicity.cas,
|
|
indicators=ingredient.toxicity.indicators + custom_indicators
|
|
)
|
|
|
|
ingredient.save()
|
|
return ingredient
|
|
|
|
def get_stats(self):
|
|
stats = {
|
|
"has_dap_info": self.dap_info is not None,
|
|
"has_cosing_info": self.cosing_info is not None,
|
|
"has_toxicity_info": self.toxicity is not None,
|
|
"num_tox_indicators": len(self.toxicity.indicators) if self.toxicity else 0,
|
|
"has_best_tox_indicator": self.toxicity.best_case is not None if self.toxicity else False,
|
|
"has_restrictions_in_cosing": any(self.cosing_info[0].annex) if self.cosing_info else False,
|
|
"has_noael_indicator": any(ind.indicator == 'NOAEL' for ind in self.toxicity.indicators) if self.toxicity else False,
|
|
"has_ld50_indicator": any(ind.indicator == 'LD50' for ind in self.toxicity.indicators) if self.toxicity else False,
|
|
"has_loael_indicator": any(ind.indicator == 'LOAEL' for ind in self.toxicity.indicators) if self.toxicity else False
|
|
}
|
|
return stats
|
|
|
|
def is_old(self, threshold_days: int = 365) -> bool:
|
|
if not self.creation_date:
|
|
return True
|
|
|
|
creation_dt = dt.fromisoformat(self.creation_date)
|
|
current_dt = dt.now()
|
|
delta = current_dt - creation_dt
|
|
|
|
return delta.days > threshold_days
|
|
|
|
def add_inci_name(self, inci_name: str):
|
|
if self.inci is None:
|
|
self.inci = []
|
|
if inci_name not in self.inci:
|
|
self.inci.append(inci_name)
|
|
|
|
def return_best_toxicity(self) -> Optional[ToxIndicator]:
|
|
if self.toxicity and self.toxicity.best_case:
|
|
return self.toxicity.best_case
|
|
return None
|
|
|
|
def return_cosing_restrictions(self) -> List[str]:
|
|
restrictions = []
|
|
if self.cosing_info:
|
|
for cosing in self.cosing_info:
|
|
restrictions.extend(cosing.annex)
|
|
return restrictions
|
|
|
|
def add_tox_indicator(self, indicator: ToxIndicator):
|
|
"""Aggiunge un indicatore tossicologico custom e ricalcola il best_case."""
|
|
indicator.is_custom = True
|
|
if self.toxicity is None:
|
|
self.toxicity = Toxicity(cas=self.cas, indicators=[indicator])
|
|
else:
|
|
new_indicators = self.toxicity.indicators + [indicator]
|
|
self.toxicity = Toxicity(cas=self.cas, indicators=new_indicators)
|
|
self.save()
|
|
|
|
class Esposition(BaseModel):
|
|
preset_name: str
|
|
tipo_prodotto: str
|
|
popolazione_target: str = "Adulti"
|
|
peso_target_kg: float = 60.0
|
|
|
|
luogo_applicazione: str
|
|
esp_normali: List[str]
|
|
esp_secondarie: List[str]
|
|
esp_nano: List[str]
|
|
|
|
sup_esposta: int = Field(ge=1, le=20000, description="Area di applicazione in cm2")
|
|
freq_applicazione: float = Field(default=1, description="Numero di applicazioni al giorno")
|
|
qta_giornaliera: float = Field(..., description="Quantità di prodotto applicata (g/die)")
|
|
ritenzione: float = Field(default=1.0, ge=0.01, le=1.0, description="Fattore di ritenzione")
|
|
|
|
note: Optional[str] = None
|
|
|
|
@field_validator('esp_normali', 'esp_secondarie', 'esp_nano', mode='before')
|
|
@classmethod
|
|
def parse_postgres_array(cls, v):
|
|
# Se Postgres restituisce una stringa tipo '{a,b}' la trasformiamo in ['a','b']
|
|
if isinstance(v, str):
|
|
cleaned = v.strip('{}[]')
|
|
return [item.strip() for item in cleaned.split(',')] if cleaned else []
|
|
return v
|
|
|
|
@computed_field
|
|
@property
|
|
def esposizione_calcolata(self) -> float:
|
|
return self.qta_giornaliera * self.ritenzione
|
|
|
|
@computed_field
|
|
@property
|
|
def esposizione_relativa(self) -> float:
|
|
return (self.esposizione_calcolata * 1000) / self.peso_target_kg
|
|
|
|
def save_to_postgres(self):
|
|
data = self.model_dump(mode='json')
|
|
query = """INSERT INTO tipi_prodotti (
|
|
preset_name, tipo_prodotto, luogo_applicazione,
|
|
esp_normali, esp_secondarie, esp_nano,
|
|
sup_esposta, freq_applicazione, qta_giornaliera, ritenzione
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id_preset;"""
|
|
|
|
conn = postgres_connect()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(query, (
|
|
data.get("preset_name"), data.get("tipo_prodotto"),
|
|
data.get("luogo_applicazione"), data.get("esp_normali"),
|
|
data.get("esp_secondarie"), data.get("esp_nano"),
|
|
data.get("sup_esposta"), data.get("freq_applicazione"),
|
|
data.get("qta_giornaliera"), data.get("ritenzione")
|
|
))
|
|
result = cur.fetchone()
|
|
conn.commit()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.error(f"Esposition.save_to_postgres '{self.preset_name}': {e}")
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
@classmethod
|
|
def get_presets(cls):
|
|
conn = postgres_connect()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("SELECT preset_name, tipo_prodotto, luogo_applicazione, esp_normali, esp_secondarie, esp_nano, sup_esposta, freq_applicazione, qta_giornaliera, ritenzione FROM tipi_prodotti;")
|
|
results = cur.fetchall()
|
|
|
|
lista_oggetti = []
|
|
for r in results:
|
|
obj = cls(
|
|
preset_name=r[0],
|
|
tipo_prodotto=r[1],
|
|
luogo_applicazione=r[2],
|
|
esp_normali=r[3],
|
|
esp_secondarie=r[4],
|
|
esp_nano=r[5],
|
|
sup_esposta=r[6],
|
|
freq_applicazione=r[7],
|
|
qta_giornaliera=r[8],
|
|
ritenzione=r[9]
|
|
)
|
|
lista_oggetti.append(obj)
|
|
return lista_oggetti
|
|
except Exception as e:
|
|
logger.error(f"Esposition.get_presets: {e}")
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
@classmethod
|
|
def get_by_name(cls, preset_name: str):
|
|
"""Recupera un preset di esposizione per nome."""
|
|
conn = postgres_connect()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT preset_name, tipo_prodotto, luogo_applicazione,
|
|
esp_normali, esp_secondarie, esp_nano,
|
|
sup_esposta, freq_applicazione, qta_giornaliera, ritenzione
|
|
FROM tipi_prodotti WHERE preset_name = %s""",
|
|
(preset_name,)
|
|
)
|
|
r = cur.fetchone()
|
|
if r:
|
|
return cls(
|
|
preset_name=r[0], tipo_prodotto=r[1], luogo_applicazione=r[2],
|
|
esp_normali=r[3], esp_secondarie=r[4], esp_nano=r[5],
|
|
sup_esposta=r[6], freq_applicazione=r[7],
|
|
qta_giornaliera=r[8], ritenzione=r[9]
|
|
)
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Esposition.get_by_name '{preset_name}': {e}")
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
@classmethod
|
|
def delete_by_name(cls, preset_name: str) -> bool:
|
|
conn = postgres_connect()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("DELETE FROM tipi_prodotti WHERE preset_name = %s RETURNING id_preset;", (preset_name,))
|
|
result = cur.fetchone()
|
|
conn.commit()
|
|
return result is not None
|
|
except Exception as e:
|
|
logger.error(f"Esposition.delete_by_name '{preset_name}': {e}")
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
conn.close() |