Merge branch 'main' of github.com:adish-rmr/cosmoguard_backend

This commit is contained in:
Adish 2026-02-22 19:50:43 +01:00
commit 448060155a
18 changed files with 2303 additions and 278 deletions

1
.gitignore vendored
View file

@ -210,3 +210,4 @@ __marimo__/
pdfs/
streamlit/
exports/

129
CLAUDE.md
View file

@ -26,19 +26,23 @@ src/pif_compiler/
│ └── routes/
│ ├── api_echa.py # ECHA endpoints (single + batch search)
│ ├── api_cosing.py # COSING endpoints (single + batch search)
│ ├── api_ingredients.py # Ingredient search by CAS + list all ingested
│ ├── api_esposition.py # Esposition preset creation + list all presets
│ ├── api_ingredients.py # Ingredient search by CAS + list all ingested + add tox indicator + clients CRUD
│ ├── api_esposition.py # Esposition preset CRUD (create, list, delete)
│ ├── api_orders.py # Order creation, retry, manual pipeline trigger, Excel/PDF export
│ └── common.py # PDF generation, PubChem, CIR search endpoints
├── classes/
│ ├── __init__.py # Re-exports all models from models.py and main_cls.py
│ ├── __init__.py # Re-exports all models from models.py and main_workflow.py
│ ├── models.py # Pydantic models: Ingredient, DapInfo, CosingInfo,
│ │ # ToxIndicator, Toxicity, Esposition, RetentionFactors, StatoOrdine
│ └── main_cls.py # Orchestrator classes: Order (raw input layer),
│ # Project (processed layer), IngredientInput
│ └── main_workflow.py # Order/Project workflow: Order (DB + raw JSON layer),
│ # Project (enriched layer), ProjectIngredient,
│ # orchestrator functions (receive_order, process_order_pipeline,
│ # retry_order, trigger_pipeline)
├── functions/
│ ├── common_func.py # PDF generation with Playwright
│ ├── common_func.py # PDF generation with Playwright, tox+COSING source PDF batch generation, COSING PDF download, ZIP creation
│ ├── common_log.py # Centralized logging configuration
│ └── db_utils.py # MongoDB + PostgreSQL connection helpers
│ ├── db_utils.py # MongoDB + PostgreSQL connection helpers
│ └── excel_export.py # Excel export (4 sheets: Anagrafica, Esposizione, SED, MoS)
└── services/
├── srv_echa.py # ECHA scraping, HTML parsing, toxicology extraction,
│ # orchestrator (validate -> check cache -> fetch -> store)
@ -52,7 +56,8 @@ src/pif_compiler/
- `data/` - Input data files (`input.json` with sample INCI/CAS/percentage lists), DB schema reference (`db_schema.sql`), old CSV data
- `logs/` - Rotating log files (debug.log, error.log) - auto-generated
- `pdfs/` - Generated PDF files from ECHA dossier pages
- `streamlit/` - Streamlit UI pages (`ingredients_page.py`, `exposition_page.py`)
- `streamlit/` - Streamlit UI pages (`ingredients_page.py`, `exposition_page.py`, `order_page.py`, `orders_page.py`)
- `scripts/` - Utility scripts (`create_mock_order.py` - inserts a test order with 4 ingredients)
- `marimo/` - **Ignore this folder.** Debug/test notebooks, not part of the main application
## Architecture & Data Flow
@ -74,10 +79,61 @@ src/pif_compiler/
- `Ingredient.get_or_create(cas)` checks PostgreSQL -> MongoDB cache, returns cached if not older than 365 days, otherwise re-scrapes
- Search history is logged to PostgreSQL (`logs.search_history` table)
### Order / Project architecture
- **Order** (`main_cls.py`): Raw input layer. Receives JSON with client, compiler, product type, ingredients list (CAS + percentage). Cleans CAS numbers (strips `\n`, splits by `;`). Saves to MongoDB `orders` collection. Registers client/compiler in PostgreSQL.
- **Project** (`main_cls.py`): Processed layer. Created from an Order via `Project.from_order()`. Holds enriched `Ingredient` objects, percentages mapping (CAS -> %), and `Esposition` preset. `process_ingredients()` calls `Ingredient.get_or_create()` for each CAS. Saves to MongoDB `projects` collection.
- An order can update an older project — they are decoupled.
### Order / Project workflow (`main_workflow.py`)
The order processing uses a **background pipeline** with state machine tracking via `StatoOrdine`:
```
POST /orders/create → receive_order() → BackgroundTasks → process_order_pipeline()
│ │
▼ ▼
Save raw JSON to MongoDB Order.pick_next() (oldest with stato=1)
+ create ordini record (stato=1) │
+ return id_ordine immediately ▼
order.validate_anagrafica() → stato=2
Project.from_order() → stato=3
(loads Esposition preset, parses ingredients)
project.process_ingredients() → Ingredient.get_or_create()
(skip if skip_tox=True or CAS empty)
stato=5 (ARRICCHITO)
project.save() → MongoDB + progetti table + ingredients_lineage
On error → stato=9 (ERRORE) + note with error message
```
- **Order** (`main_workflow.py`): Pydantic model with DB table attributes + raw JSON from MongoDB. `pick_next()` classmethod picks the oldest pending order (FIFO). `validate_anagrafica()` upserts client in `clienti` table. `update_stato()` is the reusable state transition method.
- **Project** (`main_workflow.py`): Created from Order via `Project.from_order()`. Holds `Esposition` preset (loaded by name from DB), list of `ProjectIngredient` with enriched `Ingredient` objects. `process_ingredients()` calls `Ingredient.get_or_create()` for each CAS. `save()` dumps to MongoDB `projects` collection, creates `progetti` entry, and populates `ingredients_lineage`.
- **ProjectIngredient**: Helper model with cas, inci, percentage, is_colorante, skip_tox, and optional `Ingredient` object.
- **Retry**: `retry_order(id_ordine)` resets an ERRORE order back to RICEVUTO for reprocessing.
- **Manual trigger**: `trigger_pipeline()` launches the pipeline on-demand for any pending order.
- Pipeline is **on-demand only** (no periodic polling). Each API call to `/orders/create` or `/orders/retry` triggers one background execution.
### Excel export (`excel_export.py`)
`export_project_excel(project, output_path)` generates a 4-sheet Excel file:
1. **Anagrafica** — Client info (nome, prodotto, preset) + ingredient table (INCI, CAS, %, colorante, skip_tox)
2. **Esposizione** — All esposition parameters + computed fields via Excel formulas (`=B12*B13`, `=B15*1000/B5`)
3. **SED** — SED calculation per ingredient. Formula: `=(C{r}/100)*Esposizione!$B$12*Esposizione!$B$13/Esposizione!$B$5*1000`. COSING restrictions highlighted in red.
4. **MoS** — 14 columns (Nome, %, SED, DAP, SED con DAP, Indicatore, Valore, Fattore, MoS, Fonte, Info DAP, Restrizioni, Altre Restrizioni, Note). MoS formula: `=IF(AND(E{r}>0,H{r}>0),G{r}/(E{r}*H{r}),"")`. Includes legend row.
Called via `Project.export_excel()` method, exposed at `GET /orders/export/{id_ordine}`.
### Source PDF generation (`common_func.py`)
- `generate_project_source_pdfs(project)` — for each ingredient, generates two types of source PDFs:
1. **Tox best_case PDF**: downloads the ECHA dossier page of `best_case` via Playwright. Naming: `CAS_source.pdf` (source is the `ToxIndicator.source` attribute, e.g., `56-81-5_repeated_dose_toxicity.pdf`)
2. **COSING PDF**: downloads the official COSING regulation PDF via EU API for each `CosingInfo` with a `reference` attribute. Naming: `CAS_cosing.pdf`
- `cosing_download(ref_no)` — downloads the COSING regulation PDF from `api.tech.ec.europa.eu` by reference number. Returns PDF bytes or error string
- `create_sources_zip(pdf_paths, zip_path)` — bundles all source PDFs into a ZIP archive
- Exposed at `GET /orders/export-sources/{id_ordine}` — returns ZIP as FileResponse
### PostgreSQL schema (see `data/db_schema.sql`)
@ -102,10 +158,22 @@ All routes are under `/api/v1`:
| POST | `/echa/batch-search` | Batch ECHA search for multiple CAS numbers |
| POST | `/cosing/search` | COSING search (by name, CAS, EC, or ID) |
| POST | `/cosing/batch-search` | Batch COSING search |
| POST | `/ingredients/search` | Get full ingredient by CAS (cached or scraped) |
| POST | `/ingredients/search` | Get full ingredient by CAS (cached or scraped, `force` param to bypass cache) |
| POST | `/ingredients/add-tox-indicator` | Add custom ToxIndicator to an ingredient |
| GET | `/ingredients/list` | List all ingested ingredients from PostgreSQL |
| GET | `/ingredients/clients` | List all registered clients |
| POST | `/ingredients/clients` | Create or retrieve a client |
| POST | `/esposition/create` | Create a new esposition preset |
| DELETE | `/esposition/delete/{preset_name}` | Delete an esposition preset by name |
| GET | `/esposition/presets` | List all esposition presets |
| POST | `/orders/create` | Create order + start background processing |
| POST | `/orders/retry/{id_ordine}` | Retry a failed order (ERRORE → RICEVUTO) |
| POST | `/orders/trigger-pipeline` | Manually trigger pipeline for next pending order |
| GET | `/orders/export/{id_ordine}` | Download Excel export for a completed order |
| GET | `/orders/export-sources/{id_ordine}` | Download ZIP of tox + COSING source PDFs for an order |
| GET | `/orders/list` | List all orders with client/compiler/status info |
| GET | `/orders/detail/{id_ordine}` | Full order detail with ingredients from MongoDB |
| DELETE | `/orders/{id_ordine}` | Delete order and all related data (PostgreSQL + MongoDB) |
| POST | `/common/pubchem` | PubChem property lookup by CAS |
| POST | `/common/generate-pdf` | Generate PDF from URL via Playwright |
| GET | `/common/download-pdf/{name}` | Download a generated PDF |
@ -142,29 +210,58 @@ uv run uvicorn pif_compiler.main:app --reload --host 0.0.0.0 --port 8000
### Key conventions
- Services in `services/` handle external API calls and data extraction
- Models in `classes/models.py` use Pydantic `@model_validator` and `@classmethod` builders for construction from raw API data
- Orchestrator classes in `classes/main_cls.py` handle Order (raw input) and Project (processed) layers
- Workflow classes in `classes/main_workflow.py` handle Order (DB + raw JSON) and Project (enriched) layers
- Order processing runs as a FastAPI `BackgroundTasks` callback (on-demand, not polled)
- The `orchestrator` pattern (see `srv_echa.py`) handles: validate input -> check local cache -> fetch from external -> store locally -> return
- `Ingredient.ingredient_builder(cas)` calls scraping functions directly (`pubchem_dap`, `cosing_entry`, `orchestrator`)
- `Ingredient.save()` upserts to both MongoDB and PostgreSQL, `Ingredient.from_cas()` retrieves via PostgreSQL index -> MongoDB
- `Ingredient.get_or_create(cas)` is the main entry point: checks cache freshness (365 days), scrapes if needed
- `Ingredient.get_or_create(cas, force=False)` is the main entry point: checks cache freshness (365 days), scrapes if needed. `force=True` bypasses cache entirely and re-scrapes
- All modules use the shared logger from `common_log.get_logger()`
- API routes define Pydantic request/response models inline in each route file
### db_utils.py functions
**Core:**
- `db_connect(db_name, collection_name)` - MongoDB collection accessor
- `postgres_connect()` - PostgreSQL connection
**Ingredients:**
- `upsert_ingrediente(cas, mongo_id, dap, cosing, tox)` - Upsert ingredient in PostgreSQL
- `get_ingrediente_by_cas(cas)` - Get ingredient row by CAS
- `get_ingrediente_id_by_cas(cas)` - Get PostgreSQL ID by CAS (for lineage FK)
- `get_all_ingredienti()` - List all ingredients from PostgreSQL
**Clients / Compilers:**
- `upsert_cliente(nome_cliente)` - Upsert client, returns `id_cliente`
- `upsert_compilatore(nome_compilatore)` - Upsert compiler, returns `id_compilatore`
- `get_all_clienti()` - List all clients from PostgreSQL
**Orders:**
- `insert_ordine(uuid_ordine, id_cliente)` - Insert new order, returns `id_ordine`
- `get_ordine_by_id(id_ordine)` - Get full order row
- `get_oldest_pending_order()` - Get oldest order with stato=RICEVUTO
- `aggiorna_stato_ordine(id_ordine, nuovo_stato)` - Update order status
- `update_ordine_cliente(id_ordine, id_cliente)` - Set client on order
- `update_ordine_progetto(id_ordine, uuid_progetto)` - Set project UUID on order
- `update_ordine_note(id_ordine, note)` - Set note on order
- `reset_ordine_per_retry(id_ordine)` - Reset ERRORE order to RICEVUTO
- `get_all_ordini()` - List all orders with JOINs to clienti/compilatori/stati_ordini
- `delete_ordine(id_ordine)` - Delete order + related data (lineage, progetti, MongoDB docs)
**Projects:**
- `get_preset_id_by_name(preset_name)` - Get preset FK by name
- `insert_progetto(mongo_id, id_preset)` - Insert project, returns `id`
- `insert_ingredient_lineage(id_progetto, id_ingrediente)` - Insert project-ingredient join
**Logging:**
- `log_ricerche(cas, target, esito)` - Log search history
### Streamlit UI
- `streamlit/ingredients_page.py` - Ingredient search by CAS + result display + inventory of ingested ingredients
- `streamlit/exposition_page.py` - Esposition preset creation form + list of existing presets
- Both pages call the FastAPI endpoints via `requests` (API must be running on `localhost:8000`)
- `streamlit/order_page.py` - Order creation form (client dropdown, preset selection, ingredient data_editor with CAS/INCI/percentage, AQUA auto-detection, validation, submit with background processing)
- `streamlit/orders_page.py` - Order management: list with filters (date, client, status), detail view with ingredients, actions (refresh, retry, Excel download, PDF sources ZIP, delete with confirmation), notes/log display
- All pages call the FastAPI endpoints via `requests` (API must be running on `localhost:8000`)
- Run with: `streamlit run streamlit/<page>.py`
### Important domain concepts

View file

@ -43,7 +43,7 @@ CREATE TABLE public.ordini (
id_cliente integer,
id_compilatore integer,
uuid_ordine character varying NOT NULL,
uuid_progetto character varying NOT NULL UNIQUE,
uuid_progetto character varying UNIQUE,
data_ordine timestamp without time zone NOT NULL,
stato_ordine integer DEFAULT 1,
note text,

View file

@ -27,6 +27,7 @@ dependencies = [
"python-dotenv>=1.2.1",
"requests>=2.32.5",
"streamlit>=1.50.0",
"openpyxl>=3.1.0",
"uvicorn>=0.35.0",
"weasyprint>=66.0",
]

View file

@ -0,0 +1,246 @@
"""
Script per creare un ordine mock con 4 ingredienti per testare la UI.
Inserisce direttamente nei database senza passare dalla pipeline (no scraping).
Uso: uv run python scripts/create_mock_order.py
"""
import sys
import os
# Aggiungi il path del progetto
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from pif_compiler.functions.db_utils import (
db_connect, upsert_cliente, insert_ordine, aggiorna_stato_ordine,
update_ordine_cliente, upsert_ingrediente
)
from pif_compiler.classes.models import (
StatoOrdine, Ingredient, DapInfo, CosingInfo, ToxIndicator, Toxicity, Esposition
)
from pif_compiler.classes.main_workflow import Project, ProjectIngredient
def ensure_preset_exists(preset_name="Test Preset"):
"""Verifica che il preset esista, altrimenti lo crea."""
preset = Esposition.get_by_name(preset_name)
if preset:
print(f"Preset '{preset_name}' già esistente")
return preset
print(f"Creazione preset '{preset_name}'...")
preset = Esposition(
preset_name=preset_name,
tipo_prodotto="Crema corpo",
luogo_applicazione="Corpo",
esp_normali=["Dermal"],
esp_secondarie=["Oral"],
esp_nano=[],
sup_esposta=15670,
freq_applicazione=1,
qta_giornaliera=7.82,
ritenzione=1.0
)
result = preset.save_to_postgres()
if result:
print(f"Preset creato con id_preset={result}")
else:
print("ERRORE: impossibile creare il preset")
sys.exit(1)
return preset
def create_mock_ingredients():
"""Crea ingredienti mock con dati finti di tossicologia e DAP."""
# GLYCERIN (56-81-5) — con NOAEL
glycerin = Ingredient(
cas="56-81-5",
inci=["GLYCERIN"],
dap_info=DapInfo(
cas="56-81-5",
molecular_weight=92.09,
log_pow=-1.76,
tpsa=60.69,
melting_point=18.0
),
cosing_info=[CosingInfo(
cas=["56-81-5"],
common_names=["Glycerol"],
inci=["GLYCERIN"],
annex=[],
functionName=["Humectant", "Solvent", "Skin conditioning"],
otherRestrictions=[],
cosmeticRestriction=None
)],
toxicity=Toxicity(
cas="56-81-5",
indicators=[
ToxIndicator(
indicator="NOAEL", value=1000, unit="mg/kg bw/day",
route="oral", toxicity_type="repeated_dose_toxicity",
ref="https://chem.echa.europa.eu/100.003.264"
),
ToxIndicator(
indicator="LD50", value=12600, unit="mg/kg bw",
route="oral", toxicity_type="acute_toxicity",
ref="https://chem.echa.europa.eu/100.003.264"
)
]
)
)
# CETYL ALCOHOL (36653-82-4) — con NOAEL
cetyl = Ingredient(
cas="36653-82-4",
inci=["CETYL ALCOHOL"],
dap_info=DapInfo(
cas="36653-82-4",
molecular_weight=242.44,
log_pow=6.83,
tpsa=20.23,
melting_point=49.0
),
cosing_info=[CosingInfo(
cas=["36653-82-4"],
common_names=["Cetyl alcohol", "1-Hexadecanol"],
inci=["CETYL ALCOHOL"],
annex=[],
functionName=["Emollient", "Emulsifying", "Opacifying"],
otherRestrictions=[],
cosmeticRestriction=None
)],
toxicity=Toxicity(
cas="36653-82-4",
indicators=[
ToxIndicator(
indicator="NOAEL", value=1000, unit="mg/kg bw/day",
route="oral", toxicity_type="repeated_dose_toxicity",
ref="https://chem.echa.europa.eu/100.004.098"
)
]
)
)
# TOCOPHEROL (59-02-9) — con LOAEL
tocopherol = Ingredient(
cas="59-02-9",
inci=["TOCOPHEROL"],
dap_info=DapInfo(
cas="59-02-9",
molecular_weight=430.71,
log_pow=10.51,
tpsa=29.46,
melting_point=3.0
),
cosing_info=[CosingInfo(
cas=["59-02-9"],
common_names=["alpha-Tocopherol"],
inci=["TOCOPHEROL"],
annex=[],
functionName=["Antioxidant", "Skin conditioning"],
otherRestrictions=[],
cosmeticRestriction=None
)],
toxicity=Toxicity(
cas="59-02-9",
indicators=[
ToxIndicator(
indicator="LOAEL", value=500, unit="mg/kg bw/day",
route="oral", toxicity_type="repeated_dose_toxicity",
ref="https://chem.echa.europa.eu/100.000.375"
)
]
)
)
# Salva ogni ingrediente su MongoDB + PostgreSQL
for ing in [glycerin, cetyl, tocopherol]:
mongo_id = ing.save()
print(f"Ingrediente {ing.cas} ({ing.inci[0]}) salvato (mongo_id={mongo_id})")
return glycerin, cetyl, tocopherol
def create_mock_order(preset, glycerin, cetyl, tocopherol):
"""Crea un ordine mock completo."""
# 1. Upsert cliente
client_name = "Cosmetica Test Srl"
id_cliente = upsert_cliente(client_name)
print(f"Cliente '{client_name}' → id_cliente={id_cliente}")
# 2. JSON ordine grezzo
raw_json = {
"client_name": client_name,
"product_name": "Crema Idratante Test",
"preset_esposizione": preset.preset_name,
"ingredients": [
{"inci": "AQUA", "cas": "", "percentage": 70.0, "is_colorante": False, "skip_tox": True},
{"inci": "GLYCERIN", "cas": "56-81-5", "percentage": 15.0, "is_colorante": False, "skip_tox": False},
{"inci": "CETYL ALCOHOL", "cas": "36653-82-4", "percentage": 10.0, "is_colorante": False, "skip_tox": False},
{"inci": "TOCOPHEROL", "cas": "59-02-9", "percentage": 5.0, "is_colorante": False, "skip_tox": False},
]
}
# 3. Salva su MongoDB orders
orders_col = db_connect(collection_name='orders')
result = orders_col.insert_one(raw_json.copy())
uuid_ordine = str(result.inserted_id)
print(f"Ordine salvato su MongoDB: uuid_ordine={uuid_ordine}")
# 4. Inserisci in PostgreSQL ordini
id_ordine = insert_ordine(uuid_ordine, id_cliente)
print(f"Ordine inserito in PostgreSQL: id_ordine={id_ordine}")
# 5. Aggiorna stato a ARRICCHITO
update_ordine_cliente(id_ordine, id_cliente)
aggiorna_stato_ordine(id_ordine, int(StatoOrdine.ARRICCHITO))
print(f"Stato ordine aggiornato a ARRICCHITO ({StatoOrdine.ARRICCHITO})")
# 6. Crea progetto con ingredienti arricchiti
project = Project(
order_id=id_ordine,
product_name="Crema Idratante Test",
client_name=client_name,
esposition=preset,
ingredients=[
ProjectIngredient(cas=None, inci="AQUA", percentage=70.0, skip_tox=True),
ProjectIngredient(cas="56-81-5", inci="GLYCERIN", percentage=15.0, ingredient=glycerin),
ProjectIngredient(cas="36653-82-4", inci="CETYL ALCOHOL", percentage=10.0, ingredient=cetyl),
ProjectIngredient(cas="59-02-9", inci="TOCOPHEROL", percentage=5.0, ingredient=tocopherol),
]
)
# 7. Salva il progetto (MongoDB + PostgreSQL)
uuid_progetto = project.save()
print(f"Progetto salvato: uuid_progetto={uuid_progetto}")
print("\n" + "=" * 60)
print("MOCK ORDER CREATO CON SUCCESSO")
print("=" * 60)
print(f" id_ordine: {id_ordine}")
print(f" uuid_ordine: {uuid_ordine}")
print(f" uuid_progetto: {uuid_progetto}")
print(f" cliente: {client_name}")
print(f" prodotto: Crema Idratante Test")
print(f" preset: {preset.preset_name}")
print(f" ingredienti: 4 (AQUA, GLYCERIN, CETYL ALCOHOL, TOCOPHEROL)")
print(f" stato: ARRICCHITO ({StatoOrdine.ARRICCHITO})")
print("=" * 60)
return id_ordine
if __name__ == "__main__":
print("Creazione ordine mock...")
print()
# 1. Assicura che il preset esista
preset = ensure_preset_exists()
# 2. Crea ingredienti mock
glycerin, cetyl, tocopherol = create_mock_ingredients()
# 3. Crea l'ordine
create_mock_order(preset, glycerin, cetyl, tocopherol)

View file

@ -81,6 +81,37 @@ async def create_esposition(request: EspositionRequest):
)
@router.delete("/esposition/delete/{preset_name}", response_model=EspositionResponse, tags=["Esposition"])
async def delete_esposition(preset_name: str):
"""Elimina un preset di esposizione tramite il nome."""
logger.info(f"Eliminazione preset esposizione: {preset_name}")
try:
deleted = Esposition.delete_by_name(preset_name)
if not deleted:
logger.warning(f"Preset '{preset_name}' non trovato")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Preset '{preset_name}' non trovato"
)
logger.info(f"Preset '{preset_name}' eliminato con successo")
return EspositionResponse(
success=True,
data={"preset_name": preset_name, "deleted": True}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Errore eliminazione preset {preset_name}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.get("/esposition/presets", response_model=EspositionListResponse, tags=["Esposition"])
async def get_all_presets():
"""Recupera tutti i preset di esposizione da PostgreSQL."""

View file

@ -2,8 +2,8 @@ from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from pif_compiler.classes.models import Ingredient
from pif_compiler.functions.db_utils import get_all_ingredienti
from pif_compiler.classes.models import Ingredient, ToxIndicator
from pif_compiler.functions.db_utils import get_all_ingredienti, get_all_clienti, upsert_cliente
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
@ -12,11 +12,13 @@ router = APIRouter()
class IngredientRequest(BaseModel):
cas: str = Field(..., description="CAS number dell'ingrediente da cercare")
force: bool = Field(default=False, description="Se True, ignora la cache e riesegue lo scraping")
class Config:
json_schema_extra = {
"example": {
"cas": "56-81-5"
"cas": "56-81-5",
"force": False
}
}
@ -31,10 +33,10 @@ class IngredientResponse(BaseModel):
@router.post("/ingredients/search", response_model=IngredientResponse, tags=["Ingredients"])
async def get_ingredient(request: IngredientRequest):
"""Recupera un ingrediente per CAS. Se esiste in cache lo restituisce, altrimenti lo crea da scraping."""
logger.info(f"Richiesta ingrediente per CAS: {request.cas}")
logger.info(f"Richiesta ingrediente per CAS: {request.cas}, force refresh: {request.force}")
try:
ingredient = Ingredient.get_or_create(request.cas)
ingredient = Ingredient.get_or_create(request.cas, force=request.force)
if ingredient is None:
logger.warning(f"Nessun dato trovato per CAS: {request.cas}")
@ -59,6 +61,72 @@ async def get_ingredient(request: IngredientRequest):
)
class AddToxIndicatorRequest(BaseModel):
cas: str = Field(..., description="CAS number dell'ingrediente")
indicator: str = Field(..., description="Tipo di indicatore (NOAEL, LOAEL, LD50)")
value: int = Field(..., description="Valore dell'indicatore")
unit: str = Field(..., description="Unità di misura (es. mg/kg bw/day)")
route: str = Field(..., description="Via di esposizione (es. oral, dermal, inhalation)")
toxicity_type: Optional[str] = Field(default=None, description="Tipo di tossicità (es. acute_toxicity, repeated_dose_toxicity)")
ref: Optional[str] = Field(default=None, description="Riferimento o fonte del dato")
class Config:
json_schema_extra = {
"example": {
"cas": "56-81-5",
"indicator": "NOAEL",
"value": 1000,
"unit": "mg/kg bw/day",
"route": "oral",
"toxicity_type": "repeated_dose_toxicity",
"ref": "Custom - studio interno"
}
}
@router.post("/ingredients/add-tox-indicator", response_model=IngredientResponse, tags=["Ingredients"])
async def add_tox_indicator(request: AddToxIndicatorRequest):
"""Aggiunge un indicatore tossicologico custom a un ingrediente e ricalcola il best_case."""
logger.info(f"Aggiunta indicatore tox custom per CAS: {request.cas}")
try:
ingredient = Ingredient.from_cas(request.cas)
if ingredient is None:
logger.warning(f"Ingrediente non trovato per CAS: {request.cas}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ingrediente con CAS '{request.cas}' non trovato in cache"
)
new_indicator = ToxIndicator(
indicator=request.indicator,
value=request.value,
unit=request.unit,
route=request.route,
toxicity_type=request.toxicity_type,
ref=request.ref
)
ingredient.add_tox_indicator(new_indicator)
logger.info(f"Indicatore tox aggiunto per CAS {request.cas}, best_case: {ingredient.toxicity.best_case.indicator if ingredient.toxicity.best_case else 'nessuno'}")
return IngredientResponse(
success=True,
cas=request.cas,
data=ingredient.model_dump()
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Errore aggiunta indicatore tox per {request.cas}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
class IngredientListResponse(BaseModel):
success: bool
total: int
@ -100,3 +168,67 @@ async def list_ingredients():
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
class ClientListResponse(BaseModel):
success: bool
data: Optional[List[Dict[str, Any]]] = None
error: Optional[str] = None
class ClientCreateRequest(BaseModel):
nome_cliente: str = Field(..., description="Nome del cliente")
class ClientCreateResponse(BaseModel):
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
@router.get("/ingredients/clients", response_model=ClientListResponse, tags=["Clients"])
async def list_clients():
"""Restituisce tutti i clienti registrati."""
logger.info("Recupero lista clienti")
try:
rows = get_all_clienti()
clients = [{"id_cliente": r[0], "nome_cliente": r[1]} for r in rows]
logger.info(f"Recuperati {len(clients)} clienti")
return ClientListResponse(success=True, data=clients)
except Exception as e:
logger.error(f"Errore recupero clienti: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.post("/ingredients/clients", response_model=ClientCreateResponse, tags=["Clients"])
async def create_client(request: ClientCreateRequest):
"""Crea o recupera un cliente. Ritorna id_cliente."""
logger.info(f"Creazione/recupero cliente: {request.nome_cliente}")
try:
id_cliente = upsert_cliente(request.nome_cliente)
if id_cliente is None:
return ClientCreateResponse(
success=False,
error=f"Errore nel salvataggio del cliente '{request.nome_cliente}'"
)
logger.info(f"Cliente '{request.nome_cliente}' con id_cliente={id_cliente}")
return ClientCreateResponse(
success=True,
data={"id_cliente": id_cliente, "nome_cliente": request.nome_cliente}
)
except Exception as e:
logger.error(f"Errore creazione cliente: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)

View file

@ -0,0 +1,440 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks, status
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
from pif_compiler.classes.main_workflow import receive_order, process_order_pipeline, retry_order, trigger_pipeline, Project
from pif_compiler.functions.db_utils import db_connect, get_ordine_by_id, get_all_ordini, delete_ordine
from pif_compiler.functions.common_func import generate_project_source_pdfs, create_sources_zip
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
router = APIRouter()
# ==================== REQUEST / RESPONSE MODELS ====================
class OrderIngredientInput(BaseModel):
inci: Optional[str] = None
cas: Optional[str] = None
percentage: float = 0.0
is_colorante: bool = False
skip_tox: bool = False
class OrderCreateRequest(BaseModel):
client_name: str = Field(..., description="Nome del cliente")
product_name: str = Field(..., description="Nome del prodotto cosmetico")
preset_esposizione: str = Field(..., description="Nome del preset di esposizione")
ingredients: List[OrderIngredientInput] = Field(..., description="Lista ingredienti")
class Config:
json_schema_extra = {
"example": {
"client_name": "Cosmetica Italia srl",
"product_name": "Crema 'Cremosa'",
"preset_esposizione": "Test Preset",
"ingredients": [
{"inci": "AQUA", "cas": "", "percentage": 90, "is_colorante": False, "skip_tox": True},
{"inci": None, "cas": "56-81-5", "percentage": 6, "is_colorante": False, "skip_tox": False},
{"inci": None, "cas": "9007-16-3", "percentage": 3, "is_colorante": False, "skip_tox": False},
{"inci": None, "cas": "JYY-807", "percentage": 1, "is_colorante": True, "skip_tox": False}
]
}
}
class OrderCreateResponse(BaseModel):
success: bool
id_ordine: Optional[int] = None
message: Optional[str] = None
error: Optional[str] = None
# ==================== ROUTES ====================
@router.post("/orders/create", response_model=OrderCreateResponse, tags=["Orders"])
async def create_order(request: OrderCreateRequest, background_tasks: BackgroundTasks):
"""
Crea un nuovo ordine e avvia l'elaborazione in background.
Il JSON viene salvato su MongoDB, il record su PostgreSQL (stato=RICEVUTO).
L'arricchimento degli ingredienti avviene in background.
"""
logger.info(f"Nuovo ordine ricevuto: cliente={request.client_name}, prodotto={request.product_name}")
try:
raw_json = request.model_dump()
id_ordine = receive_order(raw_json)
if id_ordine is None:
return OrderCreateResponse(
success=False,
error="Errore nel salvataggio dell'ordine"
)
# Avvia l'elaborazione in background
background_tasks.add_task(process_order_pipeline)
logger.info(f"Ordine {id_ordine} creato, elaborazione avviata in background")
return OrderCreateResponse(
success=True,
id_ordine=id_ordine,
message=f"Ordine {id_ordine} ricevuto. Elaborazione avviata in background."
)
except Exception as e:
logger.error(f"Errore creazione ordine: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.post("/orders/retry/{id_ordine}", response_model=OrderCreateResponse, tags=["Orders"])
async def retry_failed_order(id_ordine: int, background_tasks: BackgroundTasks):
"""
Resetta un ordine in stato ERRORE a RICEVUTO e rilancia la pipeline.
"""
logger.info(f"Retry ordine {id_ordine}")
try:
success = retry_order(id_ordine)
if not success:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Ordine {id_ordine} non trovato o non in stato ERRORE"
)
background_tasks.add_task(process_order_pipeline)
return OrderCreateResponse(
success=True,
id_ordine=id_ordine,
message=f"Ordine {id_ordine} resettato. Rielaborazione avviata in background."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Errore retry ordine {id_ordine}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.post("/orders/trigger-pipeline", response_model=OrderCreateResponse, tags=["Orders"])
async def manual_trigger_pipeline(background_tasks: BackgroundTasks):
"""
Lancia manualmente la pipeline di elaborazione.
Processa il prossimo ordine pendente (più vecchio con stato RICEVUTO).
"""
logger.info("Trigger manuale pipeline")
try:
background_tasks.add_task(trigger_pipeline)
return OrderCreateResponse(
success=True,
message="Pipeline avviata in background."
)
except Exception as e:
logger.error(f"Errore trigger pipeline: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.get("/orders/export/{id_ordine}", tags=["Orders"])
async def export_order_excel(id_ordine: int):
"""
Genera e scarica il file Excel per un ordine completato.
L'ordine deve avere un uuid_progetto associato (stato >= ARRICCHITO).
"""
logger.info(f"Export Excel per ordine {id_ordine}")
try:
# Recupera l'ordine dal DB
row = get_ordine_by_id(id_ordine)
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ordine {id_ordine} non trovato"
)
uuid_progetto = row[4] # uuid_progetto
if not uuid_progetto:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Ordine {id_ordine} non ha un progetto associato (elaborazione non completata?)"
)
# Recupera il progetto da MongoDB
from bson import ObjectId
collection = db_connect(collection_name='projects')
doc = collection.find_one({"_id": ObjectId(uuid_progetto)})
if not doc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Progetto {uuid_progetto} non trovato in MongoDB"
)
doc.pop("_id", None)
project = Project(**doc)
# Genera Excel
output_path = project.export_excel()
logger.info(f"Excel generato per ordine {id_ordine}: {output_path}")
return FileResponse(
path=output_path,
filename=f"progetto_{id_ordine}.xlsx",
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Errore export Excel ordine {id_ordine}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.get("/orders/export-sources/{id_ordine}", tags=["Orders"])
async def export_order_sources(id_ordine: int):
"""
Genera i PDF delle fonti per ogni ingrediente di un ordine e li restituisce in un archivio ZIP.
Include: PDF tossicologico del best_case (CAS_source.pdf) + PDF COSING (CAS_cosing.pdf).
"""
logger.info(f"Export fonti PDF per ordine {id_ordine}")
try:
# Recupera l'ordine dal DB
row = get_ordine_by_id(id_ordine)
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ordine {id_ordine} non trovato"
)
uuid_progetto = row[4]
if not uuid_progetto:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Ordine {id_ordine} non ha un progetto associato"
)
# Recupera il progetto da MongoDB
from bson import ObjectId
collection = db_connect(collection_name='projects')
doc = collection.find_one({"_id": ObjectId(uuid_progetto)})
if not doc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Progetto {uuid_progetto} non trovato in MongoDB"
)
doc.pop("_id", None)
project = Project(**doc)
# Genera i PDF delle fonti
pdf_paths = await generate_project_source_pdfs(project)
if not pdf_paths:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Nessuna fonte (tox/COSING) disponibile per questo progetto"
)
# Crea ZIP
import os
os.makedirs("exports", exist_ok=True)
zip_path = f"exports/fonti_ordine_{id_ordine}.zip"
create_sources_zip(pdf_paths, zip_path)
logger.info(f"ZIP fonti generato per ordine {id_ordine}: {zip_path} ({len(pdf_paths)} PDF)")
return FileResponse(
path=zip_path,
filename=f"fonti_ordine_{id_ordine}.zip",
media_type="application/zip"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Errore export fonti ordine {id_ordine}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.get("/orders/list", tags=["Orders"])
async def list_orders():
"""
Recupera la lista di tutti gli ordini con info cliente, compilatore e stato.
Per ciascun ordine recupera product_name dal documento MongoDB.
"""
logger.info("Richiesta lista ordini")
try:
rows = get_all_ordini()
orders = []
orders_col = db_connect(collection_name='orders')
for row in rows:
id_ordine, uuid_ordine, uuid_progetto, data_ordine, stato_ordine, note, \
nome_cliente, nome_compilatore, nome_stato = row
# Recupera product_name da MongoDB
product_name = None
if uuid_ordine:
try:
from bson import ObjectId
doc = orders_col.find_one({"_id": ObjectId(uuid_ordine)}, {"product_name": 1})
if doc:
product_name = doc.get("product_name")
except Exception:
pass
orders.append({
"id_ordine": id_ordine,
"uuid_ordine": uuid_ordine,
"uuid_progetto": uuid_progetto,
"data_ordine": data_ordine.isoformat() if data_ordine else None,
"stato_ordine": stato_ordine,
"note": note,
"nome_cliente": nome_cliente,
"nome_compilatore": nome_compilatore,
"nome_stato": nome_stato,
"product_name": product_name,
})
return {"success": True, "data": orders}
except Exception as e:
logger.error(f"Errore lista ordini: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.get("/orders/detail/{id_ordine}", tags=["Orders"])
async def get_order_detail(id_ordine: int):
"""
Recupera il dettaglio completo di un ordine: dati PostgreSQL + documento MongoDB.
Include product_name, preset, lista ingredienti dal JSON originale.
"""
logger.info(f"Dettaglio ordine {id_ordine}")
try:
row = get_ordine_by_id(id_ordine)
if not row:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ordine {id_ordine} non trovato"
)
id_ord, id_cliente, id_compilatore, uuid_ordine, uuid_progetto, \
data_ordine, stato_ordine, note = row
# Recupera nomi da PostgreSQL
from pif_compiler.functions.db_utils import postgres_connect
conn = postgres_connect()
with conn.cursor() as cur:
nome_cliente = None
if id_cliente:
cur.execute("SELECT nome_cliente FROM clienti WHERE id_cliente = %s", (id_cliente,))
r = cur.fetchone()
nome_cliente = r[0] if r else None
nome_compilatore = None
if id_compilatore:
cur.execute("SELECT nome_compilatore FROM compilatori WHERE id_compilatore = %s", (id_compilatore,))
r = cur.fetchone()
nome_compilatore = r[0] if r else None
cur.execute("SELECT nome_stato FROM stati_ordini WHERE id_stato = %s", (stato_ordine,))
r = cur.fetchone()
nome_stato = r[0] if r else None
conn.close()
# Recupera dati dal documento MongoDB ordine
product_name = None
preset_esposizione = None
ingredients = []
if uuid_ordine:
from bson import ObjectId
orders_col = db_connect(collection_name='orders')
try:
doc = orders_col.find_one({"_id": ObjectId(uuid_ordine)})
if doc:
product_name = doc.get("product_name")
preset_esposizione = doc.get("preset_esposizione")
ingredients = doc.get("ingredients", [])
except Exception:
pass
return {
"success": True,
"order": {
"id_ordine": id_ord,
"uuid_ordine": uuid_ordine,
"uuid_progetto": uuid_progetto,
"data_ordine": data_ordine.isoformat() if data_ordine else None,
"stato_ordine": stato_ordine,
"nome_stato": nome_stato,
"note": note,
"nome_cliente": nome_cliente,
"nome_compilatore": nome_compilatore,
"product_name": product_name,
"preset_esposizione": preset_esposizione,
"ingredients": ingredients,
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Errore dettaglio ordine {id_ordine}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)
@router.delete("/orders/{id_ordine}", tags=["Orders"])
async def remove_order(id_ordine: int):
"""
Elimina un ordine e tutti i dati correlati (progetto, lineage, documenti MongoDB).
"""
logger.info(f"Eliminazione ordine {id_ordine}")
try:
success = delete_ordine(id_ordine)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ordine {id_ordine} non trovato o errore nell'eliminazione"
)
return {"success": True, "message": f"Ordine {id_ordine} eliminato con successo"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Errore eliminazione ordine {id_ordine}: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Errore interno: {str(e)}"
)

View file

@ -15,10 +15,14 @@ from pif_compiler.classes.models import (
Esposition,
)
from pif_compiler.classes.main_cls import (
IngredientInput,
from pif_compiler.classes.main_workflow import (
ProjectIngredient,
Order,
Project,
receive_order,
process_order_pipeline,
retry_order,
trigger_pipeline,
)
__all__ = [
@ -30,7 +34,9 @@ __all__ = [
"Ingredient",
"RetentionFactors",
"Esposition",
"IngredientInput",
"ProjectIngredient",
"Order",
"Project",
"receive_order",
"process_order_pipeline",
]

View file

@ -1,209 +0,0 @@
import uuid
from pydantic import BaseModel, Field, model_validator
from typing import Dict, List, Optional
from datetime import datetime as dt
from pif_compiler.classes.models import (
StatoOrdine,
Ingredient,
Esposition,
)
from pif_compiler.functions.db_utils import (
db_connect,
upsert_cliente,
upsert_compilatore,
)
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
class IngredientInput(BaseModel):
cas_raw: str
cas_list: List[str] = Field(default_factory=list)
percentage: float
@model_validator(mode='after')
def clean_cas(self):
cleaned = self.cas_raw.replace('\n', '')
self.cas_list = [c.strip() for c in cleaned.split(';') if c.strip()]
return self
class Order(BaseModel):
"""Layer grezzo: riceve l'input, lo valida e lo salva su MongoDB."""
uuid_ordine: str = Field(default_factory=lambda: str(uuid.uuid4()))
client_name: str
compiler_name: str
product_type: str
date: str
notes: str = ""
stato: StatoOrdine = StatoOrdine.RICEVUTO
ingredients_input: List[IngredientInput]
total_percentage: float = 0
num_ingredients: int = 0
created_at: Optional[str] = None
@model_validator(mode='after')
def set_defaults(self):
if self.created_at is None:
self.created_at = dt.now().isoformat()
if self.total_percentage == 0:
self.total_percentage = sum(i.percentage for i in self.ingredients_input)
if self.num_ingredients == 0:
self.num_ingredients = len(self.ingredients_input)
return self
@classmethod
def from_input(cls, data: dict):
"""Costruisce un Order a partire dal JSON grezzo di input."""
ingredients_input = []
for ing in data.get('ingredients', []):
ingredients_input.append(IngredientInput(
cas_raw=ing.get('CAS Number', ''),
percentage=ing.get('Percentage (%)', 0)
))
return cls(
client_name=data.get('client_name', ''),
compiler_name=data.get('compiler_name', ''),
product_type=data.get('product_type', ''),
date=data.get('date', ''),
notes=data.get('notes', ''),
ingredients_input=ingredients_input,
)
def save(self):
"""Salva l'ordine su MongoDB (collection 'orders'). Ritorna il mongo_id."""
collection = db_connect(collection_name='orders')
mongo_dict = self.model_dump()
result = collection.replace_one(
{"uuid_ordine": self.uuid_ordine},
mongo_dict,
upsert=True
)
if result.upserted_id:
mongo_id = str(result.upserted_id)
else:
doc = collection.find_one({"uuid_ordine": self.uuid_ordine}, {"_id": 1})
mongo_id = str(doc["_id"])
logger.info(f"Ordine {self.uuid_ordine} salvato su MongoDB: {mongo_id}")
return mongo_id
def register(self):
"""Registra cliente e compilatore su PostgreSQL. Ritorna (id_cliente, id_compilatore)."""
id_cliente = upsert_cliente(self.client_name)
id_compilatore = upsert_compilatore(self.compiler_name)
logger.info(f"Registrato cliente={id_cliente}, compilatore={id_compilatore}")
return id_cliente, id_compilatore
class Project(BaseModel):
"""Layer elaborato: contiene gli ingredienti arricchiti, l'esposizione e le statistiche."""
uuid_progetto: str = Field(default_factory=lambda: str(uuid.uuid4()))
uuid_ordine: str
stato: StatoOrdine = StatoOrdine.VALIDATO
ingredients: List[Ingredient] = Field(default_factory=list)
percentages: Dict[str, float] = Field(default_factory=dict)
esposition: Optional[Esposition] = None
created_at: Optional[str] = None
@model_validator(mode='after')
def set_created_at(self):
if self.created_at is None:
self.created_at = dt.now().isoformat()
return self
@classmethod
def from_order(cls, order: Order):
"""Crea un progetto a partire da un ordine, estraendo la lista CAS e le percentuali."""
percentages = {}
for ing_input in order.ingredients_input:
for cas in ing_input.cas_list:
percentages[cas] = ing_input.percentage
return cls(
uuid_ordine=order.uuid_ordine,
percentages=percentages,
)
def process_ingredients(self):
"""Arricchisce tutti gli ingredienti tramite Ingredient.get_or_create."""
self.stato = StatoOrdine.ARRICCHIMENTO
self.ingredients = []
errori = 0
for cas in self.percentages:
try:
ingredient = Ingredient.get_or_create(cas)
self.ingredients.append(ingredient)
logger.info(f"Ingrediente {cas} processato")
except Exception as e:
logger.error(f"Errore processando CAS {cas}: {e}")
errori += 1
self.stato = StatoOrdine.ARRICCHIMENTO_PARZIALE if errori > 0 else StatoOrdine.ARRICCHITO
self.save()
return self.ingredients
def set_esposition(self, preset_name: str):
"""Carica un preset di esposizione da PostgreSQL per nome."""
presets = Esposition.get_presets()
for p in presets:
if p.preset_name == preset_name:
self.esposition = p
return p
logger.warning(f"Preset '{preset_name}' non trovato")
return None
def save(self):
"""Salva il progetto su MongoDB (collection 'projects'). Ritorna il mongo_id."""
collection = db_connect(collection_name='projects')
mongo_dict = self.model_dump()
result = collection.replace_one(
{"uuid_progetto": self.uuid_progetto},
mongo_dict,
upsert=True
)
if result.upserted_id:
mongo_id = str(result.upserted_id)
else:
doc = collection.find_one({"uuid_progetto": self.uuid_progetto}, {"_id": 1})
mongo_id = str(doc["_id"])
logger.info(f"Progetto {self.uuid_progetto} salvato su MongoDB: {mongo_id}")
return mongo_id
def get_stats(self):
"""Ritorna statistiche sul progetto e sullo stato di arricchimento."""
stats = {
"uuid_progetto": self.uuid_progetto,
"uuid_ordine": self.uuid_ordine,
"stato": self.stato.name,
"has_esposition": self.esposition is not None,
"num_ingredients": len(self.ingredients),
"num_cas_input": len(self.percentages),
}
if self.ingredients:
stats["enrichment"] = {
"with_dap": sum(1 for i in self.ingredients if i.dap_info is not None),
"with_cosing": sum(1 for i in self.ingredients if i.cosing_info is not None),
"with_tox": sum(1 for i in self.ingredients if i.toxicity is not None),
"with_noael": sum(
1 for i in self.ingredients
if i.toxicity and any(ind.indicator == 'NOAEL' for ind in i.toxicity.indicators)
),
}
return stats

View file

@ -0,0 +1,368 @@
"""
PIF Compiler - Workflow di elaborazione ordini
Contiene le classi Order e Project e le funzioni orchestratore
per il flusso di elaborazione degli ordini cosmetici.
"""
from datetime import datetime as dt
from typing import List, Optional
from pydantic import BaseModel, Field, ConfigDict, model_validator
from pif_compiler.classes.models import (
StatoOrdine, Ingredient, Esposition
)
from pif_compiler.functions.db_utils import (
db_connect, upsert_cliente, aggiorna_stato_ordine,
insert_ordine, get_oldest_pending_order,
update_ordine_cliente, update_ordine_progetto, update_ordine_note,
get_preset_id_by_name, insert_progetto,
insert_ingredient_lineage, get_ingrediente_id_by_cas,
get_ordine_by_id, reset_ordine_per_retry
)
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
# ==================== MODELS ====================
class ProjectIngredient(BaseModel):
"""Rappresenta un ingrediente nel contesto di un progetto."""
cas: Optional[str] = None
inci: Optional[str] = None
percentage: float = 0.0
is_colorante: bool = False
skip_tox: bool = False
ingredient: Optional[Ingredient] = None
class Order(BaseModel):
"""
Ordine grezzo ricevuto dal front-end.
Contiene i dati della tabella ordini + il JSON grezzo da MongoDB.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
# Attributi dalla tabella ordini
id_ordine: int
uuid_ordine: str
id_cliente: Optional[int] = None
id_compilatore: Optional[int] = None
data_ordine: dt
stato_ordine: StatoOrdine = StatoOrdine.RICEVUTO
uuid_progetto: Optional[str] = None
note: Optional[str] = None
# JSON grezzo da MongoDB
raw_json: dict = Field(default_factory=dict)
# Campi derivati dal raw_json
client_name: Optional[str] = None
product_name: Optional[str] = None
preset_name: Optional[str] = None
ingredients_raw: list = Field(default_factory=list)
@model_validator(mode='after')
def parse_raw_json(self):
"""Parsa il raw_json per estrarre i campi principali."""
if self.raw_json:
self.client_name = self.raw_json.get('client_name')
self.product_name = self.raw_json.get('product_name')
self.preset_name = self.raw_json.get('preset_esposizione')
self.ingredients_raw = self.raw_json.get('ingredients', [])
return self
@classmethod
def pick_next(cls) -> Optional['Order']:
"""
Recupera il prossimo ordine da elaborare.
Prende il più vecchio con stato_ordine = RICEVUTO (1).
"""
row = get_oldest_pending_order()
if not row:
logger.info("Nessun ordine pendente trovato")
return None
id_ordine, id_cliente, id_compilatore, uuid_ordine, uuid_progetto, data_ordine, stato_ordine, note = row
# Recupera il JSON grezzo da MongoDB
from bson import ObjectId
collection = db_connect(collection_name='orders')
doc = collection.find_one({"_id": ObjectId(uuid_ordine)})
if not doc:
logger.error(f"Documento MongoDB non trovato per uuid_ordine={uuid_ordine}")
return None
doc.pop("_id", None)
logger.info(f"Ordine {id_ordine} recuperato (uuid={uuid_ordine})")
return cls(
id_ordine=id_ordine,
uuid_ordine=uuid_ordine,
id_cliente=id_cliente,
id_compilatore=id_compilatore,
data_ordine=data_ordine,
stato_ordine=StatoOrdine(stato_ordine),
uuid_progetto=uuid_progetto,
note=note,
raw_json=doc
)
def update_stato(self, nuovo_stato: StatoOrdine):
"""Aggiorna lo stato dell'ordine nel DB e nell'oggetto."""
aggiorna_stato_ordine(self.id_ordine, nuovo_stato)
self.stato_ordine = nuovo_stato
logger.info(f"Ordine {self.id_ordine}: stato -> {nuovo_stato.name} ({nuovo_stato.value})")
def validate_anagrafica(self):
"""
Valida i dati anagrafici dell'ordine.
- Verifica/crea il cliente nella tabella clienti
- Aggiorna id_cliente sull'ordine
- Stato -> VALIDATO
"""
if not self.client_name:
raise ValueError("Nome cliente mancante nel JSON dell'ordine")
id_cliente = upsert_cliente(self.client_name)
if id_cliente is None:
raise ValueError(f"Errore nella validazione del cliente '{self.client_name}'")
self.id_cliente = id_cliente
update_ordine_cliente(self.id_ordine, id_cliente)
logger.info(f"Ordine {self.id_ordine}: cliente '{self.client_name}' validato (id={id_cliente})")
self.update_stato(StatoOrdine.VALIDATO)
class Project(BaseModel):
"""
Progetto di valutazione cosmetica.
Creato a partire da un Order, contiene gli ingredienti arricchiti
e il preset di esposizione.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
order_id: int
product_name: str
client_name: str
esposition: Esposition
ingredients: List[ProjectIngredient] = Field(default_factory=list)
@classmethod
def from_order(cls, order: Order) -> 'Project':
"""Crea un Project a partire da un Order validato."""
# Recupera il preset di esposizione
preset_name = order.preset_name
if not preset_name:
raise ValueError("Nome preset esposizione mancante nell'ordine")
esposition = Esposition.get_by_name(preset_name)
if not esposition:
raise ValueError(f"Preset esposizione '{preset_name}' non trovato nel database")
logger.info(f"Ordine {order.id_ordine}: preset '{preset_name}' recuperato")
# Parsa gli ingredienti dal JSON grezzo
project_ingredients = []
for ing_raw in order.ingredients_raw:
cas_value = ing_raw.get('cas', '') or None
if cas_value == '':
cas_value = None
pi = ProjectIngredient(
cas=cas_value,
inci=ing_raw.get('inci'),
percentage=ing_raw.get('percentage', 0),
is_colorante=ing_raw.get('is_colorante', False),
skip_tox=ing_raw.get('skip_tox', False)
)
project_ingredients.append(pi)
logger.info(f"Ordine {order.id_ordine}: {len(project_ingredients)} ingredienti parsati")
return cls(
order_id=order.id_ordine,
product_name=order.product_name or "",
client_name=order.client_name or "",
esposition=esposition,
ingredients=project_ingredients
)
def process_ingredients(self):
"""
Arricchisce gli ingredienti tramite scraping (ECHA, PubChem, COSING).
Salta gli ingredienti con skip_tox=True o CAS vuoto.
"""
enriched = 0
skipped = 0
for pi in self.ingredients:
if pi.skip_tox or not pi.cas:
logger.info(f"Skip ingrediente: inci={pi.inci}, cas={pi.cas}, skip_tox={pi.skip_tox}")
skipped += 1
continue
try:
logger.info(f"Arricchimento ingrediente CAS={pi.cas}...")
inci_list = [pi.inci] if pi.inci else None
ingredient = Ingredient.get_or_create(pi.cas, inci=inci_list)
pi.ingredient = ingredient
enriched += 1
logger.info(f"Ingrediente CAS={pi.cas} arricchito: {ingredient.get_stats()}")
except Exception as e:
logger.error(f"Errore arricchimento CAS={pi.cas}: {e}", exc_info=True)
continue
logger.info(f"Arricchimento completato: {enriched} arricchiti, {skipped} saltati")
def save(self) -> Optional[str]:
"""
Salva il progetto su MongoDB e PostgreSQL.
1. Dump su MongoDB collection 'projects'
2. Aggiorna ordini.uuid_progetto
3. Inserisci nella tabella progetti
4. Inserisci relazioni ingredients_lineage
"""
# 1. Dump su MongoDB
collection = db_connect(collection_name='projects')
mongo_dict = self.model_dump(mode='json')
result = collection.insert_one(mongo_dict)
uuid_progetto = str(result.inserted_id)
logger.info(f"Progetto salvato su MongoDB: uuid_progetto={uuid_progetto}")
# 2. Aggiorna ordini.uuid_progetto
update_ordine_progetto(self.order_id, uuid_progetto)
# 3. Recupera id_preset e inserisci in progetti
id_preset = get_preset_id_by_name(self.esposition.preset_name)
id_progetto = insert_progetto(uuid_progetto, id_preset)
if id_progetto:
logger.info(f"Progetto inserito in PostgreSQL: id={id_progetto}")
# 4. Inserisci ingredients_lineage per ogni ingrediente arricchito
for pi in self.ingredients:
if pi.ingredient and pi.cas:
id_ingrediente = get_ingrediente_id_by_cas(pi.cas)
if id_ingrediente:
insert_ingredient_lineage(id_progetto, id_ingrediente)
logger.info(f"Lineage: progetto={id_progetto}, ingrediente CAS={pi.cas}")
else:
logger.error("Errore inserimento progetto in PostgreSQL")
return uuid_progetto
def export_excel(self, output_path: str = None) -> str:
"""Esporta il progetto in un file Excel. Ritorna il percorso del file."""
from pif_compiler.functions.excel_export import export_project_excel
return export_project_excel(self, output_path)
# ==================== ORCHESTRATOR ====================
def receive_order(raw_json: dict) -> Optional[int]:
"""
Riceve un ordine dal front-end, lo salva su MongoDB e crea il record in PostgreSQL.
Ritorna id_ordine.
"""
# 1. Salva il JSON grezzo su MongoDB collection 'orders'
collection = db_connect(collection_name='orders')
result = collection.insert_one(raw_json.copy()) # copy per evitare side-effects su _id
uuid_ordine = str(result.inserted_id)
logger.info(f"Ordine salvato su MongoDB: uuid_ordine={uuid_ordine}")
# 2. Crea il record nella tabella ordini (stato = RICEVUTO)
id_ordine = insert_ordine(uuid_ordine)
if id_ordine is None:
logger.error(f"Errore creazione record ordini per uuid={uuid_ordine}")
return None
logger.info(f"Ordine {id_ordine} creato in PostgreSQL (stato=RICEVUTO)")
return id_ordine
def process_order_pipeline():
"""
Pipeline di elaborazione ordine. Eseguita come background task.
1. Recupera il prossimo ordine pendente (più vecchio con stato RICEVUTO)
2. Valida anagrafica -> stato VALIDATO
3. Crea il Project -> stato COMPILAZIONE
4. Arricchisce gli ingredienti
5. Stato ARRICCHITO
6. Salva il progetto su MongoDB + PostgreSQL
"""
order = None
try:
# 1. Recupera il prossimo ordine
order = Order.pick_next()
if order is None:
logger.warning("Pipeline: nessun ordine pendente da elaborare")
return
logger.info(f"Pipeline: inizio elaborazione ordine {order.id_ordine}")
# 2. Valida anagrafica -> VALIDATO
order.validate_anagrafica()
# 3. Crea il Project -> COMPILAZIONE
order.update_stato(StatoOrdine.COMPILAZIONE)
project = Project.from_order(order)
logger.info(f"Pipeline: progetto creato con {len(project.ingredients)} ingredienti")
# 4. Arricchisci gli ingredienti
project.process_ingredients()
# 5. Stato ARRICCHITO
order.update_stato(StatoOrdine.COMPLETATO)
# 6. Salva il progetto
uuid_progetto = project.save()
order.uuid_progetto = uuid_progetto
logger.info(f"Pipeline: ordine {order.id_ordine} completato (uuid_progetto={uuid_progetto})")
except Exception as e:
logger.error(f"Pipeline: errore elaborazione ordine: {e}", exc_info=True)
if order:
order.update_stato(StatoOrdine.ERRORE)
update_ordine_note(order.id_ordine, f"Errore pipeline: {str(e)}")
def retry_order(id_ordine: int) -> bool:
"""
Resetta un ordine in stato ERRORE a RICEVUTO per rielaborarlo.
Ritorna True se il reset è avvenuto, False altrimenti.
"""
row = get_ordine_by_id(id_ordine)
if not row:
logger.warning(f"Retry: ordine {id_ordine} non trovato")
return False
stato_attuale = row[6] # stato_ordine
if stato_attuale != StatoOrdine.ERRORE:
logger.warning(f"Retry: ordine {id_ordine} non è in stato ERRORE (stato={stato_attuale})")
return False
result = reset_ordine_per_retry(id_ordine)
if result:
logger.info(f"Retry: ordine {id_ordine} resettato a RICEVUTO")
return True
logger.error(f"Retry: errore nel reset dell'ordine {id_ordine}")
return False
def trigger_pipeline():
"""
Lancia manualmente una esecuzione della pipeline.
Processa il prossimo ordine pendente (più vecchio con stato RICEVUTO).
"""
logger.info("Pipeline manuale: avvio")
process_order_pipeline()
logger.info("Pipeline manuale: completata")

View file

@ -4,18 +4,19 @@ from enum import IntEnum
from typing import List, Optional
from datetime import datetime as dt
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
class StatoOrdine(IntEnum):
"""Stati ordine per orchestrare il flusso di elaborazione PIF."""
RICEVUTO = 1 # Input grezzo ricevuto, caricato su MongoDB
VALIDATO = 2 # Input validato (compilatore, cliente, tipo cosmetico ok)
ARRICCHIMENTO = 3 # Arricchimento in corso (COSING, PubChem, ECHA)
ARRICCHIMENTO_PARZIALE = 4 # Arricchimento completato ma con dati mancanti
ARRICCHITO = 5 # Arricchimento completato con successo
VALIDATO = 2 # L'oggetto è stato creato (compilatore, cliente, tipo cosmetico ok)
COMPILAZIONE = 3 # l'oggetto Progetto è stato creato ed è in fase di arricchimento
ARRICCHITO = 5 # oggetto progetto è finalizzato
CALCOLO = 6 # Calcolo DAP, SED, MoS in corso
IN_REVISIONE = 7 # Calcoli completati, in attesa di revisione umana
COMPLETATO = 8 # PIF finalizzato
ERRORE = 9 # Errore durante l'elaborazione
ANNULLATO = 10 # Ordine annullato
ERRORE = 9 # Errore durante l'elaborazione, intervento umano
from pif_compiler.services.srv_echa import extract_levels, at_extractor, rdt_extractor, orchestrator
from pif_compiler.functions.db_utils import postgres_connect, upsert_ingrediente, get_ingrediente_by_cas
@ -96,21 +97,23 @@ class DapInfo(BaseModel):
try:
for item in dap_data[key]:
if '°C' in item['Value']:
mp = dap_data[key]['Value']
mp = item['Value']
mp_value = re.findall(r"[-+]?\d*\.\d+|\d+", mp)
if mp_value:
dict['melting_point'] = float(mp_value[0])
except:
except Exception as e:
logger.warning(f"DapInfo: parsing melting_point fallito per CAS={dict.get('cas', '?')}: {e}")
continue
if key == 'Dissociation Constants':
try:
for item in dap_data[key]:
if 'pKa' in item['Value']:
pk = dap_data[key]['Value']
pk = item['Value']
pk_value = re.findall(r"[-+]?\d*\.\d+|\d+", pk)
if pk_value:
dict['high_ionization'] = float(mp_value[0])
except:
dict['high_ionization'] = float(pk_value[0])
except Exception as e:
logger.warning(f"DapInfo: parsing dissociation fallito per CAS={dict.get('cas', '?')}: {e}")
continue
return cls(**dict)
@ -122,11 +125,24 @@ class CosingInfo(BaseModel):
annex : List[str] = Field(default_factory=list)
functionName : List[str] = Field(default_factory=list)
otherRestrictions : List[str] = Field(default_factory=list)
cosmeticRestriction : Optional[str]
cosmeticRestriction : Optional[str] = None
reference : Optional[str] = None
sccsOpinionUrls : List[str] = Field(default_factory=list)
@classmethod
def cosing_builder(cls, cosing_data : dict):
cosing_keys = ['nameOfCommonIngredientsGlossary', 'casNo', 'functionName', 'annexNo', 'refNo', 'otherRestrictions', 'cosmeticRestriction', 'inciName']
cosing_keys = [
'nameOfCommonIngredientsGlossary',
'casNo',
'functionName',
'annexNo',
'refNo',
'otherRestrictions',
'cosmeticRestriction',
'reference',
'inciName',
'sccsOpinionUrls'
]
keys = [k for k in cosing_data.keys() if k in cosing_keys]
cosing_dict = {}
@ -167,6 +183,13 @@ class CosingInfo(BaseModel):
cosing_dict['otherRestrictions'] = other_restrictions
if k == 'cosmeticRestriction':
cosing_dict['cosmeticRestriction'] = cosing_data[k]
if k == 'reference':
cosing_dict['reference'] = cosing_data[k]
if k == 'sccsOpinionUrls':
urls = []
for url in cosing_data[k]:
urls.append(url)
cosing_dict['sccsOpinionUrls'] = urls
return cls(**cosing_dict)
@ -184,11 +207,12 @@ class CosingInfo(BaseModel):
class ToxIndicator(BaseModel):
indicator : str
value : int
value : float
unit : str
route : str
toxicity_type : Optional[str] = None
ref : Optional[str] = None
source : Optional[str] = None
@property
def priority_rank(self):
@ -230,21 +254,28 @@ class Toxicity(BaseModel):
for tt in toxicity_types:
if tt not in result:
logger.debug(f"Toxicity CAS={cas}: nessun dato per {tt}")
continue
try:
extractor = at_extractor if tt == 'acute_toxicity' else rdt_extractor
fetch = extract_levels(result[tt], extractor=extractor)
link = result.get(f"{tt}_link", "")
if not fetch:
logger.warning(f"Toxicity CAS={cas}: {tt} presente ma nessun indicatore estratto")
continue
links = result.get("index")
link = links.get(f"{tt}_link", "")
for key, lvl in fetch.items():
lvl['ref'] = link
lvl['source'] = tt
elem = ToxIndicator(**lvl)
indicators_list.append(elem)
except Exception as e:
print(f"Errore durante l'estrazione di {tt}: {e}")
logger.error(f"Toxicity.from_result CAS={cas}: estrazione {tt} fallita: {e}")
continue
return cls(
@ -262,17 +293,24 @@ class Ingredient(BaseModel):
@classmethod
def ingredient_builder(cls, cas: str, inci: Optional[List[str]] = None):
# Recupera dati DAP da PubChem
logger.info(f"ingredient_builder CAS={cas}: inizio scraping")
dap_data = pubchem_dap(cas)
dap_info = DapInfo.dap_builder(dap_data) if isinstance(dap_data, dict) else None
if not dap_info:
logger.warning(f"CAS={cas}: nessun dato DAP da PubChem")
# Recupera dati COSING
cosing_data = cosing_entry(cas)
cosing_info = CosingInfo.cycle_identified(cosing_data) if cosing_data else None
if not cosing_info:
logger.warning(f"CAS={cas}: nessun dato COSING")
# Recupera dati tossicologici da ECHA
toxicity_data = orchestrator(cas)
toxicity = Toxicity.from_result(cas, toxicity_data) if toxicity_data else None
if not toxicity or not toxicity.indicators:
logger.warning(f"CAS={cas}: nessun indicatore tossicologico trovato")
logger.info(f"CAS={cas}: scraping completato (dap={'OK' if dap_info else '-'}, cosing={'OK' if cosing_info else '-'}, tox={len(toxicity.indicators) if toxicity else 0} ind.)")
return cls(
cas=cas,
@ -300,30 +338,31 @@ class Ingredient(BaseModel):
from pif_compiler.functions.db_utils import db_connect
collection = db_connect(collection_name='ingredients')
if collection is None:
logger.error(f"Ingredient.save CAS={self.cas}: connessione MongoDB fallita")
return None
mongo_dict = self.to_mongo_dict()
# Upsert su MongoDB usando il CAS come chiave
result = collection.replace_one(
{"cas": self.cas},
mongo_dict,
upsert=True
)
# Recupera l'ObjectId del documento (inserito o esistente)
if result.upserted_id:
mongo_id = str(result.upserted_id)
else:
doc = collection.find_one({"cas": self.cas}, {"_id": 1})
mongo_id = str(doc["_id"])
# Segna i flag di arricchimento
has_dap = self.dap_info is not None
has_cosing = self.cosing_info is not None
has_tox = self.toxicity is not None
# Upsert su PostgreSQL
upsert_ingrediente(self.cas, mongo_id, dap=has_dap, cosing=has_cosing, tox=has_tox)
logger.debug(f"Ingredient.save CAS={self.cas}: mongo_id={mongo_id}")
return mongo_id
@classmethod
@ -332,32 +371,39 @@ class Ingredient(BaseModel):
from pif_compiler.functions.db_utils import db_connect
from bson import ObjectId
# Cerca in PostgreSQL per ottenere il mongo_id
pg_entry = get_ingrediente_by_cas(cas)
if not pg_entry:
return None
_, _, mongo_id, _, _, _ = pg_entry
if not mongo_id:
logger.warning(f"from_cas CAS={cas}: presente in PG ma mongo_id è NULL")
return None
# Recupera il documento da MongoDB
collection = db_connect(collection_name='ingredients')
doc = collection.find_one({"_id": ObjectId(mongo_id)})
if not doc:
logger.warning(f"from_cas CAS={cas}: mongo_id={mongo_id} non trovato in MongoDB")
return None
doc.pop("_id", None)
return cls(**doc)
@classmethod
def get_or_create(cls, cas: str, inci: Optional[List[str]] = None):
"""Restituisce l'ingrediente dalla cache se esiste e non è vecchio, altrimenti lo ricrea."""
def get_or_create(cls, cas: str, inci: Optional[List[str]] = None, force: bool = False):
"""Restituisce l'ingrediente dalla cache se esiste e non è vecchio, altrimenti lo ricrea.
Se force=True, ignora la cache e riesegue lo scraping aggiornando il documento."""
if not force:
cached = cls.from_cas(cas)
if cached and not cached.is_old():
logger.debug(f"get_or_create CAS={cas}: cache hit")
return cached
# Crea un nuovo ingrediente (scraping) e lo salva
if cached:
logger.info(f"get_or_create CAS={cas}: cache scaduta, re-scraping")
else:
logger.info(f"get_or_create CAS={cas}: force refresh")
ingredient = cls.ingredient_builder(cas, inci=inci)
ingredient.save()
return ingredient
@ -404,6 +450,15 @@ class Ingredient(BaseModel):
restrictions.extend(cosing.annex)
return restrictions
def add_tox_indicator(self, indicator: ToxIndicator):
"""Aggiunge un indicatore tossicologico custom e ricalcola il best_case."""
if self.toxicity is None:
self.toxicity = Toxicity(cas=self.cas, indicators=[indicator])
else:
new_indicators = self.toxicity.indicators + [indicator]
self.toxicity = Toxicity(cas=self.cas, indicators=new_indicators)
self.save()
class RetentionFactors:
LEAVE_ON = 1.0
RINSE_OFF = 0.01
@ -470,7 +525,7 @@ class Esposition(BaseModel):
conn.commit()
return result[0] if result else None
except Exception as e:
print(f"Errore salvataggio: {e}")
logger.error(f"Esposition.save_to_postgres '{self.preset_name}': {e}")
conn.rollback()
return False
finally:
@ -501,7 +556,51 @@ class Esposition(BaseModel):
lista_oggetti.append(obj)
return lista_oggetti
except Exception as e:
print(f"Errore: {e}")
logger.error(f"Esposition.get_presets: {e}")
return []
finally:
conn.close()
@classmethod
def get_by_name(cls, preset_name: str):
"""Recupera un preset di esposizione per nome."""
conn = postgres_connect()
try:
with conn.cursor() as cur:
cur.execute(
"""SELECT preset_name, tipo_prodotto, luogo_applicazione,
esp_normali, esp_secondarie, esp_nano,
sup_esposta, freq_applicazione, qta_giornaliera, ritenzione
FROM tipi_prodotti WHERE preset_name = %s""",
(preset_name,)
)
r = cur.fetchone()
if r:
return cls(
preset_name=r[0], tipo_prodotto=r[1], luogo_applicazione=r[2],
esp_normali=r[3], esp_secondarie=r[4], esp_nano=r[5],
sup_esposta=r[6], freq_applicazione=r[7],
qta_giornaliera=r[8], ritenzione=r[9]
)
return None
except Exception as e:
logger.error(f"Esposition.get_by_name '{preset_name}': {e}")
return None
finally:
conn.close()
@classmethod
def delete_by_name(cls, preset_name: str) -> bool:
conn = postgres_connect()
try:
with conn.cursor() as cur:
cur.execute("DELETE FROM tipi_prodotti WHERE preset_name = %s RETURNING id_preset;", (preset_name,))
result = cur.fetchone()
conn.commit()
return result is not None
except Exception as e:
logger.error(f"Esposition.delete_by_name '{preset_name}': {e}")
conn.rollback()
return False
finally:
conn.close()

View file

@ -1,7 +1,7 @@
from playwright.async_api import async_playwright
import os
from pymongo import MongoClient
import zipfile
import requests
from pif_compiler.functions.common_log import get_logger
@ -95,5 +95,108 @@ async def generate_pdf(link: str, name: str):
return False
async def generate_project_source_pdfs(project, output_dir: str = "pdfs") -> list:
"""
Genera i PDF delle fonti per ogni ingrediente di un progetto:
- Tossicologia: PDF del best_case (naming: CAS_source.pdf)
- COSING: PDF scaricato via API per ogni CosingInfo con reference (naming: CAS_cosing.pdf)
Args:
project: oggetto Project con ingredienti arricchiti
output_dir: directory di output per i PDF
Returns:
Lista dei percorsi dei PDF generati
"""
os.makedirs(output_dir, exist_ok=True)
generated = []
for pi in project.ingredients:
if pi.skip_tox or not pi.cas or not pi.ingredient:
continue
ing = pi.ingredient
# --- Tox best_case PDF ---
best = ing.toxicity.best_case if ing.toxicity else None
if best and best.ref:
pdf_name = f"{pi.cas}_{best.source}" if best.source else pi.cas
log.info(f"Generazione PDF tox: {pdf_name} da {best.ref}")
success = await generate_pdf(best.ref, pdf_name)
if success:
generated.append(os.path.join(output_dir, f"{pdf_name}.pdf"))
else:
log.warning(f"PDF tox non generato per {pdf_name}")
# --- COSING PDF ---
if ing.cosing_info:
seen_refs = set()
for cosing in ing.cosing_info:
if not cosing.reference or cosing.reference in seen_refs:
continue
seen_refs.add(cosing.reference)
pdf_name = f"{pi.cas}_cosing"
pdf_path = os.path.join(output_dir, f"{pdf_name}.pdf")
if os.path.exists(pdf_path):
generated.append(pdf_path)
continue
log.info(f"Download COSING PDF: {pdf_name} (ref={cosing.reference})")
content = cosing_download(cosing.reference)
if isinstance(content, bytes):
with open(pdf_path, 'wb') as f:
f.write(content)
generated.append(pdf_path)
else:
log.warning(f"COSING PDF non scaricato per {pdf_name}: {content}")
log.info(f"Generazione fonti completata: {len(generated)} PDF generati")
return generated
def cosing_download(ref_no: str):
url = f'https://api.tech.ec.europa.eu/cosing20/1.0/api/cosmetics/{ref_no}/export-pdf'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'it-IT,it;q=0.9',
'Cache-Control': 'No-Cache',
'Origin': 'https://ec.europa.eu',
'Referer': 'https://ec.europa.eu/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-site',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.content
else:
return f"Error: {response.status_code} - {response.text}"
def create_sources_zip(pdf_paths: list, zip_path: str) -> str:
"""
Crea un archivio ZIP contenente i PDF delle fonti.
Args:
pdf_paths: lista dei percorsi dei PDF da includere
zip_path: percorso del file ZIP di output
Returns:
Percorso del file ZIP creato
"""
zip_dir = os.path.dirname(zip_path)
if zip_dir:
os.makedirs(zip_dir, exist_ok=True)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for path in pdf_paths:
if os.path.exists(path):
zf.write(path, os.path.basename(path))
log.info(f"ZIP creato: {zip_path} ({len(pdf_paths)} file)")
return zip_path

View file

@ -157,6 +157,19 @@ def upsert_compilatore(nome_compilatore):
logger.error(f"Errore upsert compilatore {nome_compilatore}: {e}")
return None
def get_all_clienti():
"""Recupera tutti i clienti dalla tabella clienti."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("SELECT id_cliente, nome_cliente FROM clienti ORDER BY nome_cliente")
results = cur.fetchall()
conn.close()
return results if results else []
except Exception as e:
logger.error(f"Errore recupero clienti: {e}")
return []
def log_ricerche(cas, target, esito):
try:
conn = postgres_connect()
@ -168,5 +181,259 @@ def log_ricerche(cas, target, esito):
logger.error(f"Error: {e}")
return
def insert_ordine(uuid_ordine, id_cliente=None):
"""Inserisce un nuovo ordine nella tabella ordini. Ritorna id_ordine."""
from datetime import datetime as dt
from pif_compiler.classes.models import StatoOrdine
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO ordini (uuid_ordine, id_cliente, data_ordine, stato_ordine)
VALUES (%s, %s, %s, %s) RETURNING id_ordine;""",
(uuid_ordine, id_cliente, dt.now(), int(StatoOrdine.RICEVUTO))
)
result = cur.fetchone()
conn.commit()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore inserimento ordine: {e}")
return None
def get_oldest_pending_order():
"""Recupera l'ordine più vecchio con stato_ordine = RICEVUTO (1)."""
from pif_compiler.classes.models import StatoOrdine
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""SELECT id_ordine, id_cliente, id_compilatore, uuid_ordine,
uuid_progetto, data_ordine, stato_ordine, note
FROM ordini
WHERE stato_ordine = %s
ORDER BY data_ordine ASC
LIMIT 1""",
(int(StatoOrdine.RICEVUTO),)
)
result = cur.fetchone()
conn.close()
return result
except Exception as e:
logger.error(f"Errore recupero ordine pendente: {e}")
return None
def update_ordine_cliente(id_ordine, id_cliente):
"""Aggiorna id_cliente sull'ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"UPDATE ordini SET id_cliente = %s WHERE id_ordine = %s",
(id_cliente, id_ordine)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore aggiornamento cliente ordine {id_ordine}: {e}")
def update_ordine_progetto(id_ordine, uuid_progetto):
"""Aggiorna uuid_progetto sull'ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"UPDATE ordini SET uuid_progetto = %s WHERE id_ordine = %s",
(uuid_progetto, id_ordine)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore aggiornamento progetto ordine {id_ordine}: {e}")
def update_ordine_note(id_ordine, note):
"""Aggiorna il campo note sull'ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"UPDATE ordini SET note = %s WHERE id_ordine = %s",
(note, id_ordine)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore aggiornamento note ordine {id_ordine}: {e}")
def get_ordine_by_id(id_ordine):
"""Recupera un ordine per id_ordine."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""SELECT id_ordine, id_cliente, id_compilatore, uuid_ordine,
uuid_progetto, data_ordine, stato_ordine, note
FROM ordini WHERE id_ordine = %s""",
(id_ordine,)
)
result = cur.fetchone()
conn.close()
return result
except Exception as e:
logger.error(f"Errore recupero ordine {id_ordine}: {e}")
return None
def reset_ordine_per_retry(id_ordine):
"""Resetta un ordine in stato ERRORE a RICEVUTO, pulisce note e uuid_progetto."""
from pif_compiler.classes.models import StatoOrdine
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""UPDATE ordini
SET stato_ordine = %s, note = NULL, uuid_progetto = NULL
WHERE id_ordine = %s AND stato_ordine = %s
RETURNING id_ordine;""",
(int(StatoOrdine.RICEVUTO), id_ordine, int(StatoOrdine.ERRORE))
)
result = cur.fetchone()
conn.commit()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore reset ordine {id_ordine}: {e}")
return None
def get_preset_id_by_name(preset_name):
"""Recupera l'id_preset dalla tabella tipi_prodotti per nome."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"SELECT id_preset FROM tipi_prodotti WHERE preset_name = %s",
(preset_name,)
)
result = cur.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore recupero id preset '{preset_name}': {e}")
return None
def insert_progetto(mongo_id, id_preset):
"""Inserisce un nuovo progetto nella tabella progetti. Ritorna id."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO progetti (mongo_id, preset_tipo_prodotto)
VALUES (%s, %s) RETURNING id;""",
(mongo_id, id_preset)
)
result = cur.fetchone()
conn.commit()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore inserimento progetto: {e}")
return None
def insert_ingredient_lineage(id_progetto, id_ingrediente):
"""Inserisce la relazione progetto-ingrediente nella tabella ingredients_lineage."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO ingredients_lineage (id_progetto, id_ingrediente)
VALUES (%s, %s);""",
(id_progetto, id_ingrediente)
)
conn.commit()
conn.close()
except Exception as e:
logger.error(f"Errore inserimento lineage progetto={id_progetto}, ingrediente={id_ingrediente}: {e}")
def get_all_ordini():
"""Recupera tutti gli ordini con JOIN a clienti, compilatori, stati_ordini."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("""
SELECT o.id_ordine, o.uuid_ordine, o.uuid_progetto, o.data_ordine,
o.stato_ordine, o.note, c.nome_cliente, comp.nome_compilatore, s.nome_stato
FROM ordini o
LEFT JOIN clienti c ON o.id_cliente = c.id_cliente
LEFT JOIN compilatori comp ON o.id_compilatore = comp.id_compilatore
LEFT JOIN stati_ordini s ON o.stato_ordine = s.id_stato
ORDER BY o.data_ordine DESC
""")
results = cur.fetchall()
conn.close()
return results if results else []
except Exception as e:
logger.error(f"Errore recupero ordini: {e}")
return []
def delete_ordine(id_ordine):
"""Elimina un ordine e dati correlati da PostgreSQL e MongoDB."""
try:
row = get_ordine_by_id(id_ordine)
if not row:
logger.warning(f"Ordine {id_ordine} non trovato per eliminazione")
return False
uuid_ordine = row[3]
uuid_progetto = row[4]
conn = postgres_connect()
with conn.cursor() as cur:
if uuid_progetto:
cur.execute("SELECT id FROM progetti WHERE mongo_id = %s", (uuid_progetto,))
prog_row = cur.fetchone()
if prog_row:
cur.execute("DELETE FROM ingredients_lineage WHERE id_progetto = %s", (prog_row[0],))
cur.execute("DELETE FROM progetti WHERE id = %s", (prog_row[0],))
cur.execute("DELETE FROM ordini WHERE id_ordine = %s", (id_ordine,))
conn.commit()
conn.close()
from bson import ObjectId
orders_col = db_connect(collection_name='orders')
if uuid_ordine:
try:
orders_col.delete_one({"_id": ObjectId(uuid_ordine)})
except Exception:
logger.warning(f"Documento MongoDB ordine non trovato: {uuid_ordine}")
if uuid_progetto:
projects_col = db_connect(collection_name='projects')
try:
projects_col.delete_one({"_id": ObjectId(uuid_progetto)})
except Exception:
logger.warning(f"Documento MongoDB progetto non trovato: {uuid_progetto}")
logger.info(f"Ordine {id_ordine} eliminato completamente")
return True
except Exception as e:
logger.error(f"Errore eliminazione ordine {id_ordine}: {e}")
return False
def get_ingrediente_id_by_cas(cas):
"""Recupera l'ID PostgreSQL di un ingrediente tramite CAS."""
try:
conn = postgres_connect()
with conn.cursor() as cur:
cur.execute("SELECT id FROM ingredienti WHERE cas = %s", (cas,))
result = cur.fetchone()
conn.close()
return result[0] if result else None
except Exception as e:
logger.error(f"Errore recupero id ingrediente {cas}: {e}")
return None
if __name__ == "__main__":
log_ricerche("123-45-6", "ECHA", True)

View file

@ -0,0 +1,414 @@
"""
Esportazione Excel per progetti PIF.
Genera un file Excel con 4 fogli:
1. Anagrafica - Informazioni ordine e lista ingredienti
2. Esposizione - Parametri di esposizione
3. SED - Calcolo Systemic Exposure Dosage (senza DAP)
4. MoS - Calcolo Margin of Safety (con DAP, indicatore tossicologico, restrizioni)
"""
import os
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side, numbers
from openpyxl.utils import get_column_letter
from pif_compiler.functions.common_log import get_logger
logger = get_logger()
# ==================== STYLES ====================
TITLE_FONT = Font(bold=True, size=14)
LABEL_FONT = Font(bold=True, size=11)
HEADER_FONT = Font(bold=True, color="FFFFFF", size=11)
HEADER_FILL = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
LIGHT_FILL = PatternFill(start_color="D9E2F3", end_color="D9E2F3", fill_type="solid")
WARNING_FILL = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
THIN_BORDER = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
def _style_header_row(ws, row, num_cols):
"""Applica stile alle celle header."""
for col in range(1, num_cols + 1):
cell = ws.cell(row=row, column=col)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
cell.border = THIN_BORDER
def _apply_borders(ws, row, num_cols):
"""Applica bordi a una riga."""
for col in range(1, num_cols + 1):
ws.cell(row=row, column=col).border = THIN_BORDER
WRAP_ALIGNMENT = Alignment(vertical='top', wrap_text=True)
WRAP_CENTER = Alignment(horizontal='center', vertical='top', wrap_text=True)
def _set_column_widths(ws, widths):
"""Imposta larghezze colonne fisse e applica wrap_text a tutte le celle con dati."""
for i, w in enumerate(widths, 1):
ws.column_dimensions[get_column_letter(i)].width = w
for row in ws.iter_rows(min_row=1, max_row=ws.max_row, max_col=len(widths)):
for cell in row:
if cell.alignment.horizontal == 'center':
cell.alignment = Alignment(horizontal='center', vertical='top', wrap_text=True)
else:
cell.alignment = WRAP_ALIGNMENT
def _get_ingredient_name(pi):
"""Restituisce il nome migliore per un ingrediente."""
if pi.inci:
return pi.inci
if pi.ingredient and pi.ingredient.inci:
return pi.ingredient.inci[0]
return pi.cas or ""
def _get_cosing_restrictions(ingredient):
"""Estrae le restrizioni COSING da un ingrediente."""
annex = []
other = []
if ingredient and ingredient.cosing_info:
for cosing in ingredient.cosing_info:
annex.extend(cosing.annex)
other.extend(cosing.otherRestrictions)
return "; ".join(annex), "; ".join(other)
def _get_dap_info_text(ingredient):
"""Formatta le informazioni DAP come testo leggibile."""
if not ingredient or not ingredient.dap_info:
return ""
d = ingredient.dap_info
parts = []
if d.molecular_weight is not None:
parts.append(f"Peso Molecolare: {d.molecular_weight} Da")
if d.log_pow is not None:
parts.append(f"LogP: {d.log_pow}")
if d.tpsa is not None:
parts.append(f"TPSA: {d.tpsa} A\u00b2")
if d.melting_point is not None:
parts.append(f"Punto di Fusione: {d.melting_point}\u00b0C")
if d.high_ionization is not None:
parts.append(f"pKa: {d.high_ionization}")
parts.append(f"DAP: {d.dap_value * 100:.0f}%")
return ", ".join(parts)
# ==================== SHEET BUILDERS ====================
def _build_anagrafica(wb, project):
"""Sheet 1: Anagrafica e lista ingredienti."""
ws = wb.active
ws.title = "Anagrafica"
ws.merge_cells('A1:E1')
ws['A1'] = "INFORMAZIONI ORDINE"
ws['A1'].font = TITLE_FONT
info_rows = [
("Cliente", project.client_name),
("Nome Prodotto", project.product_name),
("Preset Esposizione", project.esposition.preset_name),
]
for i, (label, value) in enumerate(info_rows):
ws.cell(row=i + 3, column=1, value=label).font = LABEL_FONT
ws.cell(row=i + 3, column=2, value=value)
# Tabella ingredienti
tbl_row = 7
headers = [
"INCI",
"CAS",
"Percentuale (%)",
"Colorante",
"Escluso da Valutazione Tossicologica"
]
for col, h in enumerate(headers, 1):
ws.cell(row=tbl_row, column=col, value=h)
_style_header_row(ws, tbl_row, len(headers))
for i, pi in enumerate(project.ingredients):
r = tbl_row + 1 + i
ws.cell(row=r, column=1, value=pi.inci or "")
ws.cell(row=r, column=2, value=pi.cas or "")
ws.cell(row=r, column=3, value=pi.percentage)
ws.cell(row=r, column=4, value="Si" if pi.is_colorante else "No")
ws.cell(row=r, column=5, value="Si" if pi.skip_tox else "No")
_apply_borders(ws, r, len(headers))
_set_column_widths(ws, [20, 14, 14, 12, 18])
def _build_esposizione(wb, project):
"""
Sheet 2: Parametri di esposizione.
Layout delle celle di riferimento (usate nelle formule SED/MoS):
B5 = Peso Corporeo Target (kg)
B12 = Quantita Giornaliera (g/giorno)
B13 = Fattore di Ritenzione
"""
ws = wb.create_sheet("Esposizione")
esp = project.esposition
ws.merge_cells('A1:B1')
ws['A1'] = "PARAMETRI DI ESPOSIZIONE"
ws['A1'].font = TITLE_FONT
params = [
# label, value, row (starting from 3)
("Tipo Prodotto", esp.tipo_prodotto),
("Popolazione Target", esp.popolazione_target),
("Peso Corporeo Target (kg)", esp.peso_target_kg), # B5
("Luogo di Applicazione", esp.luogo_applicazione),
("Vie di Esposizione Normali", ", ".join(esp.esp_normali)),
("Vie di Esposizione Secondarie", ", ".join(esp.esp_secondarie)),
("Vie di Esposizione Nano", ", ".join(esp.esp_nano)),
("Superficie Esposta (cm\u00b2)", esp.sup_esposta),
("Frequenza di Applicazione (applicazioni/giorno)", esp.freq_applicazione),
("Quantita Giornaliera (g/giorno)", esp.qta_giornaliera), # B12
("Fattore di Ritenzione", esp.ritenzione), # B13
]
for i, (label, value) in enumerate(params):
row = i + 3
ws.cell(row=row, column=1, value=label).font = LABEL_FONT
ws.cell(row=row, column=2, value=value)
# Campi calcolati con formule Excel
ws.cell(row=15, column=1, value="Esposizione Calcolata (g/giorno)").font = LABEL_FONT
ws['B15'] = "=B12*B13"
ws['B15'].number_format = '0.0000'
ws.cell(row=16, column=1, value="Esposizione Relativa (mg/kg bw/giorno)").font = LABEL_FONT
ws['B16'] = "=B15*1000/B5"
ws['B16'].number_format = '0.0000'
ws.column_dimensions['A'].width = 50
ws.column_dimensions['B'].width = 25
def _build_sed(wb, project):
"""
Sheet 3: Calcolo SED senza DAP, con restrizioni COSING.
Formula SED (mg/kg bw/giorno):
= (percentuale / 100) * qta_giornaliera * ritenzione / peso_corporeo * 1000
In riferimenti Excel:
= (C{row}/100) * Esposizione!$B$12 * Esposizione!$B$13 / Esposizione!$B$5 * 1000
"""
ws = wb.create_sheet("SED")
ws.merge_cells('A1:F1')
ws['A1'] = "CALCOLO SYSTEMIC EXPOSURE DOSAGE (SED)"
ws['A1'].font = TITLE_FONT
hdr = 3
headers = [
"Ingrediente",
"CAS",
"Percentuale (%)",
"Systemic Exposure Dosage - SED (mg/kg bw/giorno)",
"Restrizioni COSING (Annex)",
"Altre Restrizioni",
]
for col, h in enumerate(headers, 1):
ws.cell(row=hdr, column=col, value=h)
_style_header_row(ws, hdr, len(headers))
for i, pi in enumerate(project.ingredients):
r = hdr + 1 + i
name = _get_ingredient_name(pi)
ws.cell(row=r, column=1, value=name)
ws.cell(row=r, column=2, value=pi.cas or "")
ws.cell(row=r, column=3, value=pi.percentage)
# SED formula solo per ingredienti non esclusi
if not pi.skip_tox and pi.cas:
ws.cell(row=r, column=4).value = (
f"=(C{r}/100)*Esposizione!$B$12*Esposizione!$B$13"
f"/Esposizione!$B$5*1000"
)
ws.cell(row=r, column=4).number_format = '0.000000'
# Restrizioni COSING
annex, other = _get_cosing_restrictions(pi.ingredient if not pi.skip_tox else None)
ws.cell(row=r, column=5, value=annex)
ws.cell(row=r, column=6, value=other)
# Evidenzia riga se ha restrizioni
if annex:
for col in range(1, len(headers) + 1):
ws.cell(row=r, column=col).fill = WARNING_FILL
_apply_borders(ws, r, len(headers))
_set_column_widths(ws, [18, 14, 14, 20, 18, 18])
def _build_mos(wb, project):
"""
Sheet 4: Calcolo Margin of Safety.
Formule Excel:
SED (col C) = (B{r}/100) * Esposizione!$B$12 * Esposizione!$B$13 / Esposizione!$B$5 * 1000
SED con DAP (E) = C{r} * D{r}
MoS (I) = G{r} / (E{r} * H{r}) [se E > 0 e H > 0]
"""
ws = wb.create_sheet("MoS")
ws.merge_cells('A1:N1')
ws['A1'] = "CALCOLO MARGIN OF SAFETY (MoS)"
ws['A1'].font = TITLE_FONT
hdr = 3
headers = [
"Nome Ingrediente", # A
"Percentuale (%)", # B
"Systemic Exposure Dosage - SED (mg/kg bw/giorno)", # C
"Dermal Absorption Percentage (DAP)", # D
"SED corretto con DAP (mg/kg bw/giorno)", # E
"Indicatore Tossicologico (NOAEL / LOAEL / LD50)", # F
"Valore Indicatore (mg/kg bw/giorno)", # G
"Fattore di Sicurezza", # H
"Margin of Safety (MoS)", # I
"Fonte del Dato", # J
"Informazioni DAP (Peso Molecolare, LogP, TPSA, Punto Fusione)",# K
"Restrizioni COSING (Annex)", # L
"Altre Restrizioni", # M
"Note", # N
]
for col, h in enumerate(headers, 1):
ws.cell(row=hdr, column=col, value=h)
_style_header_row(ws, hdr, len(headers))
num_cols = len(headers)
r = hdr + 1
for pi in project.ingredients:
# Salta ingredienti esclusi (skip_tox o senza CAS)
if pi.skip_tox or not pi.cas:
continue
name = _get_ingredient_name(pi)
ing = pi.ingredient
best = ing.toxicity.best_case if ing and ing.toxicity else None
# A: Nome
ws.cell(row=r, column=1, value=name)
# B: Percentuale
ws.cell(row=r, column=2, value=pi.percentage)
# C: SED (senza DAP) - formula Excel
ws.cell(row=r, column=3).value = (
f"=(B{r}/100)*Esposizione!$B$12*Esposizione!$B$13"
f"/Esposizione!$B$5*1000"
)
ws.cell(row=r, column=3).number_format = '0.000000'
# D: DAP
dap_value = ing.dap_info.dap_value if ing and ing.dap_info else 0.5
ws.cell(row=r, column=4, value=dap_value)
# E: SED con DAP - formula Excel
ws.cell(row=r, column=5).value = f"=C{r}*D{r}"
ws.cell(row=r, column=5).number_format = '0.000000'
# F: Tipo indicatore
ws.cell(row=r, column=6, value=best.indicator if best else "")
# G: Valore indicatore
ws.cell(row=r, column=7, value=best.value if best else "")
# H: Fattore di sicurezza
factor = best.factor if best else 1
ws.cell(row=r, column=8, value=factor)
# I: MoS - formula Excel
ws.cell(row=r, column=9).value = (
f'=IF(AND(E{r}>0,H{r}>0),G{r}/(E{r}*H{r}),"")'
)
ws.cell(row=r, column=9).number_format = '0.00'
# J: Fonte
ws.cell(row=r, column=10, value=best.ref if best else "")
# K: Informazioni DAP
ws.cell(row=r, column=11, value=_get_dap_info_text(ing))
# L, M: Restrizioni
annex, other = _get_cosing_restrictions(ing)
ws.cell(row=r, column=12, value=annex)
ws.cell(row=r, column=13, value=other)
# N: Note (vuoto)
ws.cell(row=r, column=14, value="")
# Stile: bordi + evidenzia se MoS potrebbe essere basso
_apply_borders(ws, r, num_cols)
# Alterna colore righe per leggibilita
if (r - hdr) % 2 == 0:
for col in range(1, num_cols + 1):
ws.cell(row=r, column=col).fill = LIGHT_FILL
r += 1
# Legenda sotto la tabella
legend_row = r + 2
ws.cell(row=legend_row, column=1, value="LEGENDA").font = LABEL_FONT
ws.cell(row=legend_row + 1, column=1, value="Fattore di Sicurezza:")
ws.cell(row=legend_row + 1, column=2, value="NOAEL = 1, LOAEL = 3, LD50 = 10")
ws.cell(row=legend_row + 2, column=1, value="MoS accettabile:")
ws.cell(row=legend_row + 2, column=2, value=">= 100 (secondo linee guida SCCS)")
ws.cell(row=legend_row + 3, column=1, value="Formula MoS:")
ws.cell(row=legend_row + 3, column=2, value="Valore Indicatore / (SED con DAP x Fattore di Sicurezza)")
_set_column_widths(ws, [18, 10, 14, 8, 14, 12, 12, 10, 10, 18, 22, 18, 18, 14])
# ==================== MAIN EXPORT ====================
def export_project_excel(project, output_path: str = None) -> str:
"""
Esporta un Project completo in un file Excel (.xlsx).
Args:
project: oggetto Project da esportare
output_path: percorso del file di output (default: exports/progetto_{order_id}.xlsx)
Returns:
Il percorso del file Excel generato.
"""
if output_path is None:
os.makedirs("exports", exist_ok=True)
output_path = f"exports/progetto_{project.order_id}.xlsx"
else:
dir_name = os.path.dirname(output_path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)
wb = Workbook()
_build_anagrafica(wb, project)
_build_esposizione(wb, project)
_build_sed(wb, project)
_build_mos(wb, project)
wb.save(output_path)
logger.info(f"Excel esportato: {output_path}")
return output_path

View file

@ -8,7 +8,7 @@ import time
from pif_compiler.functions.common_log import get_logger
# Import dei tuoi router
from pif_compiler.api.routes import api_echa, api_cosing, common, api_ingredients, api_esposition
from pif_compiler.api.routes import api_echa, api_cosing, common, api_ingredients, api_esposition, api_orders
# Configurazione logging
logger = get_logger()
@ -147,6 +147,12 @@ app.include_router(
tags=["Esposition"]
)
app.include_router(
api_orders.router,
prefix="/api/v1",
tags=["Orders"]
)
# ==================== ROOT ENDPOINTS ====================
@app.get("/", tags=["Root"])

View file

@ -81,13 +81,13 @@ def clean_cosing(json_data: dict, full: bool = True) -> dict:
logger.info(f"Cleaning COSING data for: {substance_id}")
string_cols = [
"itemType", "phEurName", "chemicalName", "innName", "substanceId", "cosmeticRestriction"
"itemType", "phEurName", "chemicalName", "innName", "substanceId", "cosmeticRestriction", "reference"
]
list_cols = [
"casNo", "ecNo", "functionName", "otherRestrictions", "refNo",
"sccsOpinion", "sccsOpinionUrls", "identifiedIngredient",
"annexNo", "otherRegulations", "nameOfCommonIngredientsGlossary", "inciName"
"annexNo", "otherRegulations", "nameOfCommonIngredientsGlossary", "inciName", "sccsOpinionUrls"
]
base_url = "https://ec.europa.eu/growth/tools-databases/cosing/details/"

23
uv.lock
View file

@ -410,6 +410,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/30/79/4f544d73fcc0513b71296cb3ebb28a227d22e80dec27204977039b9fa875/duckdb-1.4.1-cp313-cp313-win_amd64.whl", hash = "sha256:280fd663dacdd12bb3c3bf41f3e5b2e5b95e00b88120afabb8b8befa5f335c6f", size = 12336460, upload-time = "2025-10-07T10:37:12.154Z" },
]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059 },
]
[[package]]
name = "fastapi"
version = "0.121.2"
@ -889,6 +898,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/06/b9/33bba5ff6fb679aa0b1f8a07e853f002a6b04b9394db3069a1270a7784ca/numpy-2.3.3-cp314-cp314t-win_arm64.whl", hash = "sha256:78c9f6560dc7e6b3990e32df7ea1a50bbd0e2a111e05209963f5ddcab7073b0b", size = 10545953, upload-time = "2025-09-09T15:58:40.576Z" },
]
[[package]]
name = "openpyxl"
version = "3.1.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "et-xmlfile" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910 },
]
[[package]]
name = "packaging"
version = "25.0"
@ -966,6 +987,7 @@ dependencies = [
{ name = "marimo" },
{ name = "markdown-to-json" },
{ name = "markdownify" },
{ name = "openpyxl" },
{ name = "playwright" },
{ name = "psycopg2-binary" },
{ name = "pubchemprops" },
@ -991,6 +1013,7 @@ requires-dist = [
{ name = "marimo", specifier = ">=0.16.5" },
{ name = "markdown-to-json", specifier = ">=2.1.2" },
{ name = "markdownify", specifier = ">=1.2.0" },
{ name = "openpyxl", specifier = ">=3.1.0" },
{ name = "playwright", specifier = ">=1.55.0" },
{ name = "psycopg2-binary", specifier = ">=2.9.11" },
{ name = "pubchemprops", specifier = ">=0.1.1" },