# PIF Compiler - Comsoguard API ## Project Overview **PIF Compiler** (branded as **Comsoguard API**) is a cosmetic safety assessment tool that compiles Product Information Files (PIF) for cosmetic products, as required by EU regulations. It aggregates toxicological, regulatory, and chemical data from multiple external sources to evaluate ingredient safety (DAP calculations, SED/MoS computations, NOAEL extraction). The primary language is **Italian** (variable names, comments, some log messages). Code is written in **Python 3.12**. ## Tech Stack - **Framework**: FastAPI + Uvicorn - **Package manager**: uv (with `pyproject.toml` + `uv.lock`) - **Data models**: Pydantic v2 - **Databases**: MongoDB (substance data cache via `pymongo`) + PostgreSQL (product presets, search logs, compilers via `psycopg2`) - **Web scraping**: Playwright (PDF generation, browser automation), BeautifulSoup4 (HTML parsing) - **External APIs**: PubChem (`pubchempy` + `pubchemprops`), COSING (EU Search API), ECHA (chem.echa.europa.eu) - **Logging**: Python `logging` with rotating file handlers (`logs/debug.log`, `logs/error.log`) ## Project Structure ``` src/pif_compiler/ ├── main.py # FastAPI app, middleware, exception handlers, routers ├── __init__.py ├── api/ │ └── routes/ │ ├── api_echa.py # ECHA endpoints (single + batch search) │ ├── api_cosing.py # COSING endpoints (single + batch search) │ ├── api_ingredients.py # Ingredient search by CAS + list all ingested + add tox indicator + clients CRUD │ ├── api_esposition.py # Esposition preset CRUD (create, list, delete) │ ├── api_orders.py # Order creation, retry, manual pipeline trigger, Excel/PDF export │ └── common.py # PDF generation, PubChem, CIR search, segnalazione endpoints ├── classes/ │ ├── __init__.py # Re-exports all models from models.py and main_workflow.py │ ├── models.py # Pydantic models: Ingredient, DapInfo, CosingInfo, │ │ # ToxIndicator, Toxicity, Esposition, RetentionFactors, StatoOrdine │ └── main_workflow.py # Order/Project workflow: Order (DB + raw JSON layer), │ # Project (enriched layer), ProjectIngredient, │ # orchestrator functions (receive_order, process_order_pipeline, │ # retry_order, trigger_pipeline) ├── functions/ │ ├── auth.py # JWT verification via Supabase JWKS (RS256/ES256), get_current_user FastAPI dependency │ ├── common_func.py # PDF generation with Playwright, tox+COSING source PDF batch generation, COSING PDF download, ZIP creation │ ├── common_log.py # Centralized logging configuration │ ├── db_utils.py # MongoDB + PostgreSQL connection helpers │ └── excel_export.py # Excel export (4 sheets: Anagrafica, Esposizione, SED, MoS) └── services/ ├── srv_echa.py # ECHA scraping, HTML parsing, toxicology extraction, │ # orchestrator (validate -> check cache -> fetch -> store) ├── srv_cosing.py # COSING API search + data cleaning ├── srv_pubchem.py # PubChem property extraction (DAP data) └── srv_cir.py # CIR (Cosmetic Ingredient Review) database search ``` ### Other directories - `data/` - Input data files (`input.json` with sample INCI/CAS/percentage lists), DB schema reference (`db_schema.sql`), old CSV data - `logs/` - Rotating log files (debug.log, error.log) - auto-generated - `pdfs/` - Generated PDF files from ECHA dossier pages - `streamlit/` - Streamlit UI pages (`ingredients_page.py`, `exposition_page.py`, `order_page.py`, `orders_page.py`) - `scripts/` - Utility scripts (`create_mock_order.py` - inserts a test order with 4 ingredients) - `marimo/` - **Ignore this folder.** Debug/test notebooks, not part of the main application ## Architecture & Data Flow ### Core workflow (per ingredient) 1. **Input**: CAS number (and optionally INCI name + percentage) 2. **COSING** (`srv_cosing.py`): Search EU cosmetic ingredients database for regulatory restrictions, INCI match, annex references 3. **ECHA** (`srv_echa.py`): Search substance -> get dossier -> parse HTML index -> extract toxicological data (NOAEL, LD50, LOAEL) from acute & repeated dose toxicity pages 4. **PubChem** (`srv_pubchem.py`): Get molecular weight, XLogP, TPSA, melting point, dissociation constants 5. **DAP calculation** (`DapInfo` model): Dermal Absorption Percentage based on molecular properties (MW > 500, LogP, TPSA > 120, etc.) 6. **Toxicity ranking** (`Toxicity` model): Best toxicological indicator selection with 3-tier priority: (1) indicator type NOAEL=4 > LOAEL=3 > LD50=1; (2) route preference dermal > oral > inhalation > other; (3) lowest value for NOAEL/LOAEL (most conservative). Safety factors: LD50→10, LOAEL→3, NOAEL→1. ### Caching strategy - **ECHA results** are cached in MongoDB (`toxinfo.substance_index` collection) keyed by `substance.rmlCas` - **Ingredients** are cached in MongoDB (`toxinfo.ingredients` collection) keyed by `cas`, with PostgreSQL `ingredienti` table as index (stores `mongo_id` + enrichment flags `dap`, `cosing`, `tox`) - **Orders** are cached in MongoDB (`toxinfo.orders` collection) keyed by `uuid_ordine` - **Projects** are cached in MongoDB (`toxinfo.projects` collection) keyed by `uuid_progetto` - The orchestrator checks local cache before making external requests - `Ingredient.get_or_create(cas)` checks PostgreSQL -> MongoDB cache, returns cached if not older than 365 days, otherwise re-scrapes - Search history is logged to PostgreSQL (`logs.search_history` table) ### Order / Project workflow (`main_workflow.py`) The order processing uses a **background pipeline** with state machine tracking via `StatoOrdine`: ``` POST /orders/create → receive_order() → BackgroundTasks → process_order_pipeline() │ │ ▼ ▼ Save raw JSON to MongoDB Order.pick_next() (oldest with stato=1) + create ordini record (stato=1) │ + return id_ordine immediately ▼ order.validate_anagrafica() → stato=2 │ ▼ Project.from_order() → stato=3 (loads Esposition preset, parses ingredients) │ ▼ project.process_ingredients() → Ingredient.get_or_create() (skip if skip_tox=True or CAS empty) │ ▼ stato=5 (ARRICCHITO) │ ▼ project.save() → MongoDB + progetti table + ingredients_lineage On error → stato=9 (ERRORE) + note with error message ``` - **Order** (`main_workflow.py`): Pydantic model with DB table attributes + raw JSON from MongoDB. `pick_next()` classmethod picks the oldest pending order (FIFO). `validate_anagrafica()` upserts client in `clienti` table. `update_stato()` is the reusable state transition method. The compiler (`id_compilatore`) is resolved at order creation time via `receive_order(raw_json, compiler_name)` from the authenticated user's JWT — not during the pipeline. - **Project** (`main_workflow.py`): Created from Order via `Project.from_order()`. Holds `Esposition` preset (loaded by name from DB), list of `ProjectIngredient` with enriched `Ingredient` objects. `process_ingredients()` calls `Ingredient.get_or_create()` for each CAS. `save()` dumps to MongoDB `projects` collection, creates `progetti` entry, and populates `ingredients_lineage`. - **ProjectIngredient**: Helper model with cas, inci, percentage, is_colorante, skip_tox, and optional `Ingredient` object. - **Retry**: `retry_order(id_ordine)` resets an ERRORE order back to RICEVUTO for reprocessing. - **Manual trigger**: `trigger_pipeline()` launches the pipeline on-demand for any pending order. - Pipeline is **on-demand only** (no periodic polling). Each API call to `/orders/create` or `/orders/retry` triggers one background execution. ### Excel export (`excel_export.py`) `export_project_excel(project, output_path)` generates a 4-sheet Excel file: 1. **Anagrafica** — Client info (nome, prodotto, preset) + ingredient table (INCI, CAS, %, colorante, skip_tox) 2. **Esposizione** — All esposition parameters + computed fields via Excel formulas (`=B12*B13`, `=B15*1000/B5`) 3. **SED** — SED calculation per ingredient. Formula: `=(C{r}/100)*Esposizione!$B$12*Esposizione!$B$13/Esposizione!$B$5*1000`. COSING restrictions highlighted in red. 4. **MoS** — 14 columns (Nome, %, SED, DAP, SED con DAP, Indicatore, Valore, Fattore, MoS, Fonte, Info DAP, Restrizioni, Altre Restrizioni, Note). MoS formula: `=IF(AND(E{r}>0,H{r}>0),G{r}/(E{r}*H{r}),"")`. Includes legend row. Called via `Project.export_excel()` method, exposed at `GET /orders/export/{id_ordine}`. ### Source PDF generation (`common_func.py`) - `generate_project_source_pdfs(project)` — for each ingredient, generates two types of source PDFs: 1. **Tox best_case PDF**: downloads the ECHA dossier page of `best_case` via Playwright. Naming: `CAS_source.pdf` (source is the `ToxIndicator.source` attribute, e.g., `56-81-5_repeated_dose_toxicity.pdf`) 2. **COSING PDF**: one PDF per ingredient, using the first `CosingInfo` entry with a valid `reference`. Naming: `CAS_cosing.pdf`. Note: an ingredient may have multiple `CosingInfo` entries but only the first valid reference is used. - `cosing_download(ref_no)` — downloads the COSING regulation PDF from `api.tech.ec.europa.eu` by reference number. Returns PDF bytes or error string - `create_sources_zip(pdf_paths, zip_path)` — bundles all source PDFs into a ZIP archive; deduplicates by filename to prevent duplicate entries - Exposed at `GET /orders/export-sources/{id_ordine}` — returns ZIP as FileResponse ### PostgreSQL schema (see `data/db_schema.sql`) - **`clienti`** - Customers (`id_cliente`, `nome_cliente`) - **`compilatori`** - PIF compilers/assessors (`id_compilatore`, `nome_compilatore`) - **`ordini`** - Orders linking a client + compiler to a project (`uuid_ordine`, `uuid_progetto`, `data_ordine`, `stato_ordine`). FK to `clienti`, `compilatori`, `stati_ordini` - **`stati_ordini`** - Order status lookup table (`id_stato`, `nome_stato`). Values mapped to `StatoOrdine` IntEnum in `models.py` - **`ingredienti`** - Ingredient registry keyed by CAS. Tracks enrichment status via boolean flags (`dap`, `cosing`, `tox`) and links to MongoDB document (`mongo_id`) - **`inci`** - INCI name to CAS mapping. FK to `ingredienti(cas)` - **`progetti`** - Projects linked to an order (`mongo_id` -> `ordini.uuid_progetto`) and a product type preset (`preset_tipo_prodotto` -> `tipi_prodotti`) - **`ingredients_lineage`** - Many-to-many join between `progetti` and `ingredienti`, tracking which ingredients belong to which project - **`tipi_prodotti`** - Product type presets with exposure parameters (`preset_name`, `tipo_prodotto`, `luogo_applicazione`, exposure routes, `sup_esposta`, `freq_applicazione`, `qta_giornaliera`, `ritenzione`). Maps to the `Esposition` Pydantic model - **`logs.search_history`** - Search audit log (`cas_ricercato`, `target`, `esito`) ## API Endpoints All routes are under `/api/v1`: | Method | Path | Description | |--------|------|-------------| | POST | `/echa/search` | Single ECHA substance search by CAS | | POST | `/echa/batch-search` | Batch ECHA search for multiple CAS numbers | | POST | `/cosing/search` | COSING search (by name, CAS, EC, or ID) | | POST | `/cosing/batch-search` | Batch COSING search | | POST | `/ingredients/search` | Get full ingredient by CAS (cached or scraped, `force` param to bypass cache) | | POST | `/ingredients/add-tox-indicator` | Add custom ToxIndicator to an ingredient | | GET | `/ingredients/list` | List all ingested ingredients from PostgreSQL | | GET | `/ingredients/clients` | List all registered clients | | POST | `/ingredients/clients` | Create or retrieve a client | | POST | `/esposition/create` | Create a new esposition preset | | DELETE | `/esposition/delete/{preset_name}` | Delete an esposition preset by name | | GET | `/esposition/presets` | List all esposition presets | | POST | `/orders/create` | Create order + start background processing | | POST | `/orders/retry/{id_ordine}` | Retry a failed order (ERRORE → RICEVUTO) | | POST | `/orders/trigger-pipeline` | Manually trigger pipeline for next pending order | | GET | `/orders/export/{id_ordine}` | Download Excel export for a completed order | | GET | `/orders/export-sources/{id_ordine}` | Download ZIP of tox + COSING source PDFs for an order | | GET | `/orders/list` | List all orders with client/compiler/status info | | GET | `/orders/detail/{id_ordine}` | Full order detail with ingredients from MongoDB | | DELETE | `/orders/{id_ordine}` | Delete order and all related data (PostgreSQL + MongoDB) | | POST | `/auth/login` | Login with email+password, returns access+refresh token (public) | | POST | `/auth/refresh` | Refresh access token via refresh token (public) | | POST | `/auth/logout` | Invalidate session on Supabase (public) | | GET | `/auth/me` | Returns current user info from JWT (id, email, name) | | POST | `/common/pubchem` | PubChem property lookup by CAS | | POST | `/common/generate-pdf` | Generate PDF from URL via Playwright | | GET | `/common/download-pdf/{name}` | Download a generated PDF | | POST | `/common/cir-search` | CIR ingredient text search | | POST | `/common/segnalazione` | Save a user bug report/ticket to MongoDB `segnalazioni` collection | | GET | `/health`, `/ping` | Health check endpoints | Docs available at `/docs` (Swagger) and `/redoc`. ## Environment Variables Configured via `.env` file (loaded with `python-dotenv`): - `ADMIN_USER` - MongoDB admin username - `ADMIN_PASSWORD` - MongoDB admin password - `MONGO_HOST` - MongoDB host - `MONGO_PORT` - MongoDB port - `DATABASE_URL` - PostgreSQL connection string - `SUPABASE_URL` - Supabase project URL (e.g. `https://xxx.supabase.co`) - `SUPABASE_SECRET_KEY` - Supabase service role key (used for auth proxying) ## Development ### Setup ```bash uv sync # Install dependencies playwright install # Install browser binaries for PDF generation ``` ### Running the API ```bash uv run python -m pif_compiler.main # or uv run uvicorn pif_compiler.main:app --reload --host 0.0.0.0 --port 8000 ``` ### Key conventions - Services in `services/` handle external API calls and data extraction - Models in `classes/models.py` use Pydantic `@model_validator` and `@classmethod` builders for construction from raw API data - Workflow classes in `classes/main_workflow.py` handle Order (DB + raw JSON) and Project (enriched) layers - Order processing runs as a FastAPI `BackgroundTasks` callback (on-demand, not polled) - The `orchestrator` pattern (see `srv_echa.py`) handles: validate input -> check local cache -> fetch from external -> store locally -> return - `Ingredient.ingredient_builder(cas)` calls scraping functions directly (`pubchem_dap`, `cosing_entry`, `orchestrator`) - `Ingredient.save()` upserts to both MongoDB and PostgreSQL, `Ingredient.from_cas()` retrieves via PostgreSQL index -> MongoDB - `Ingredient.get_or_create(cas, force=False)` is the main entry point: checks cache freshness (365 days), scrapes if needed. `force=True` bypasses cache entirely and re-scrapes - All modules use the shared logger from `common_log.get_logger()` - API routes define Pydantic request/response models inline in each route file ### db_utils.py functions **Core:** - `db_connect(db_name, collection_name)` - MongoDB collection accessor - `postgres_connect()` - PostgreSQL connection **Ingredients:** - `upsert_ingrediente(cas, mongo_id, dap, cosing, tox)` - Upsert ingredient in PostgreSQL - `get_ingrediente_by_cas(cas)` - Get ingredient row by CAS - `get_ingrediente_id_by_cas(cas)` - Get PostgreSQL ID by CAS (for lineage FK) - `get_all_ingredienti()` - List all ingredients from PostgreSQL **Clients / Compilers:** - `upsert_cliente(nome_cliente)` - Upsert client, returns `id_cliente` - `upsert_compilatore(nome_compilatore)` - Upsert compiler, returns `id_compilatore` - `get_all_clienti()` - List all clients from PostgreSQL - `get_all_compilatori()` - List all compilers from PostgreSQL - `delete_cliente(nome_cliente)` - Delete client if no linked orders, returns None if blocked **Orders:** - `insert_ordine(uuid_ordine, id_cliente=None, id_compilatore=None)` - Insert new order, returns `id_ordine` - `get_ordine_by_id(id_ordine)` - Get full order row - `get_oldest_pending_order()` - Get oldest order with stato=RICEVUTO - `aggiorna_stato_ordine(id_ordine, nuovo_stato)` - Update order status - `update_ordine_cliente(id_ordine, id_cliente)` - Set client on order - `update_ordine_compilatore(id_ordine, id_compilatore)` - Set compiler on order - `update_ordine_progetto(id_ordine, uuid_progetto)` - Set project UUID on order - `update_ordine_note(id_ordine, note)` - Set note on order - `reset_ordine_per_retry(id_ordine)` - Reset ERRORE order to RICEVUTO - `get_all_ordini()` - List all orders with JOINs to clienti/compilatori/stati_ordini - `delete_ordine(id_ordine)` - Delete order + related data (lineage, progetti, MongoDB docs) **Projects:** - `get_preset_id_by_name(preset_name)` - Get preset FK by name - `insert_progetto(mongo_id, id_preset)` - Insert project, returns `id` - `insert_ingredient_lineage(id_progetto, id_ingrediente)` - Insert project-ingredient join **Logging:** - `log_ricerche(cas, target, esito)` - Log search history ### Streamlit UI Entry point: `streamlit/app.py` (multi-page app via `st.navigation`). Run with: ```bash streamlit run streamlit/app.py ``` API must be running on `localhost:8000`. #### Shared modules - `streamlit/functions.py` — single source of truth for all shared logic: - Constants: `API_BASE`, `AUTH_BASE`, `CAS_PATTERN`, `STATUS_MAP`, `WATER_INCI` - Auth: `do_login`, `do_refresh`, `do_logout`, `check_auth`, `_auth_headers`, `_fetch_user_info` - Cookie persistence: `get_cookie_manager()`, `_COOKIE_RT="pif_rt"`, `_COOKIE_MAX_AGE=7d` — uses `extra-streamlit-components` CookieManager. Only the `refresh_token` is stored in the browser cookie. `check_auth` restores session from cookie automatically on new browser sessions. - All API wrappers: `fetch_ingredient`, `fetch_orders`, `fetch_order_detail`, `download_excel`, `download_sources`, `send_segnalazione`, etc. - Order helpers: `validate_order`, `build_order_payload`, `make_empty_ingredient_df`, `is_water_inci` - ECHA extractors: `extract_tox_info_values`, `extract_acute_values`, `extract_repeated_values` - `streamlit/functions_ui.py` — UI-level helpers: - `search_cas_inci(input, type)` — DuckDB query on `streamlit/data.csv` - `search_cir(input_text)` — DuckDB query on `streamlit/cir-reports.csv`, returns `list[tuple[name, inci, url]]` - `show_login_page()` — login form calling `do_login` - `display_orderData(order_data)` — renders order detail #### Pages (`streamlit/pages/`) - `ingredients_page.py` — ingredient search by CAS, displays DAP/COSING/tox data, PDF source download - `order_page.py` — order creation form: client, preset, ingredient table (INCI/CAS/%, AQUA auto-detection), submit → POST `/orders/create` - `list_orders.py` — order list with filters; detail view; retry/download/delete actions - `exposition_page.py` — exposure preset CRUD - `settings_page.py` — custom tox indicators, client management, ingredient inventory - `echa.py` — legacy ECHA direct search - `ticket.py` — bug report form → POST `/common/segnalazione` #### Auth flow 1. `get_cookie_manager()` is called at the very top of `app.py` before `check_auth` to render the JS component 2. On login: tokens saved to `session_state` + `refresh_token` saved to cookie `pif_rt` 3. On new session (tab reopen): `check_auth` reads cookie → calls `do_refresh` → restores session automatically 4. On logout: `session_state` cleared + cookie deleted 5. `_fetch_user_info()` called after login/restore → saves `user_name`, `user_email`, `user_id` to session_state 6. Selected CAS/INCI saved to `session_state.selected_cas` / `session_state.selected_inci` for cross-page navigation ### Important domain concepts - **CAS number**: Chemical Abstracts Service identifier (e.g., "50-00-0") - **INCI**: International Nomenclature of Cosmetic Ingredients - **NOAEL**: No Observed Adverse Effect Level (preferred toxicity indicator) - **LOAEL**: Lowest Observed Adverse Effect Level - **LD50**: Lethal Dose 50% - **DAP**: Dermal Absorption Percentage - **SED**: Systemic Exposure Dosage - **MoS**: Margin of Safety - **PIF**: Product Information File (EU cosmetic regulation requirement)