203 lines
No EOL
7.1 KiB
Python
203 lines
No EOL
7.1 KiB
Python
from playwright.async_api import async_playwright
|
|
import os
|
|
import zipfile
|
|
import requests
|
|
|
|
from pif_compiler.functions.common_log import get_logger
|
|
|
|
|
|
log = get_logger()
|
|
|
|
async def generate_pdf(link: str, name: str):
|
|
"""
|
|
Generate a PDF from a web page URL using Playwright.
|
|
|
|
Args:
|
|
link: URL of the page to convert to PDF
|
|
name: Name for the generated PDF file (without extension)
|
|
|
|
Returns:
|
|
bool: True if PDF was generated or already exists, False otherwise
|
|
"""
|
|
pdf_path = f'pdfs/{name}.pdf'
|
|
|
|
if os.path.exists(pdf_path):
|
|
log.info(f"PDF already exists for {name}, skipping generation.")
|
|
return True
|
|
|
|
log.info(f"Generating PDF for {name} from link: {link}")
|
|
|
|
try:
|
|
async with async_playwright() as p:
|
|
# Launch browser with stealth options to bypass WAF
|
|
browser = await p.chromium.launch(
|
|
headless=True,
|
|
args=[
|
|
'--disable-blink-features=AutomationControlled',
|
|
'--disable-dev-shm-usage',
|
|
'--no-sandbox',
|
|
'--disable-setuid-sandbox',
|
|
]
|
|
)
|
|
|
|
# Create context with realistic browser fingerprint
|
|
context = await browser.new_context(
|
|
viewport={'width': 1920, 'height': 1080},
|
|
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
|
|
locale='en-US',
|
|
timezone_id='Europe/Rome',
|
|
extra_http_headers={
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'Accept-Language': 'en-US,en;q=0.9,it;q=0.8',
|
|
'Cache-Control': 'max-age=0',
|
|
'Sec-Ch-Ua': '"Chromium";v="131", "Not_A Brand";v="24"',
|
|
'Sec-Ch-Ua-Mobile': '?0',
|
|
'Sec-Ch-Ua-Platform': '"Windows"',
|
|
'Sec-Fetch-Dest': 'document',
|
|
'Sec-Fetch-Mode': 'navigate',
|
|
'Sec-Fetch-Site': 'none',
|
|
'Sec-Fetch-User': '?1',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
}
|
|
)
|
|
|
|
page = await context.new_page()
|
|
|
|
# Remove webdriver property to avoid detection
|
|
await page.add_init_script("""
|
|
Object.defineProperty(navigator, 'webdriver', {
|
|
get: () => undefined
|
|
});
|
|
""")
|
|
|
|
# Navigate to the page
|
|
await page.goto(link, wait_until='networkidle', timeout=60000)
|
|
|
|
# Wait a bit to ensure everything is loaded
|
|
await page.wait_for_timeout(2000)
|
|
|
|
# Generate PDF
|
|
await page.pdf(path=pdf_path, format='A4', print_background=True)
|
|
|
|
await context.close()
|
|
await browser.close()
|
|
|
|
if os.path.exists(pdf_path):
|
|
log.info(f"PDF generated successfully for {name}")
|
|
return True
|
|
else:
|
|
log.error(f"PDF generation failed for {name}: file not found after generation")
|
|
return False
|
|
|
|
except Exception as e:
|
|
log.error(f"Error generating PDF for {name}: {str(e)}", exc_info=True)
|
|
return False
|
|
|
|
|
|
async def generate_project_source_pdfs(project, output_dir: str = "pdfs") -> list:
|
|
"""
|
|
Genera i PDF delle fonti per ogni ingrediente di un progetto:
|
|
- Tossicologia: PDF del best_case (naming: CAS_source.pdf)
|
|
- COSING: PDF scaricato via API per ogni CosingInfo con reference (naming: CAS_cosing.pdf)
|
|
|
|
Args:
|
|
project: oggetto Project con ingredienti arricchiti
|
|
output_dir: directory di output per i PDF
|
|
|
|
Returns:
|
|
Lista dei percorsi dei PDF generati
|
|
"""
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
generated = []
|
|
|
|
for pi in project.ingredients:
|
|
if pi.skip_tox or not pi.cas or not pi.ingredient:
|
|
continue
|
|
|
|
ing = pi.ingredient
|
|
|
|
# --- Tox best_case PDF ---
|
|
best = ing.toxicity.best_case if ing.toxicity else None
|
|
if best and best.ref:
|
|
source_label = best.source or best.toxicity_type or "tox"
|
|
pdf_name = f"{pi.cas}_{source_label}"
|
|
log.info(f"Generazione PDF tox: {pdf_name} da {best.ref}")
|
|
success = await generate_pdf(best.ref, pdf_name)
|
|
if success:
|
|
generated.append(os.path.join(output_dir, f"{pdf_name}.pdf"))
|
|
else:
|
|
log.warning(f"PDF tox non generato per {pdf_name}")
|
|
|
|
# --- COSING PDF ---
|
|
if ing.cosing_info:
|
|
seen_refs = set()
|
|
for cosing in ing.cosing_info:
|
|
if not cosing.reference or cosing.reference in seen_refs:
|
|
continue
|
|
seen_refs.add(cosing.reference)
|
|
|
|
pdf_name = f"{pi.cas}_cosing"
|
|
pdf_path = os.path.join(output_dir, f"{pdf_name}.pdf")
|
|
|
|
if os.path.exists(pdf_path):
|
|
generated.append(pdf_path)
|
|
continue
|
|
|
|
log.info(f"Download COSING PDF: {pdf_name} (reference={cosing.reference})")
|
|
content = cosing_download(cosing.reference)
|
|
if isinstance(content, bytes):
|
|
with open(pdf_path, 'wb') as f:
|
|
f.write(content)
|
|
generated.append(pdf_path)
|
|
else:
|
|
log.warning(f"COSING PDF non scaricato per {pdf_name}: {content}")
|
|
|
|
log.info(f"Generazione fonti completata: {len(generated)} PDF generati")
|
|
return generated
|
|
|
|
def cosing_download(ref_no: str):
|
|
url = f'https://api.tech.ec.europa.eu/cosing20/1.0/api/cosmetics/{ref_no}/export-pdf'
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:147.0) Gecko/20100101 Firefox/147.0',
|
|
'Accept': 'application/json, text/plain, */*',
|
|
'Accept-Language': 'it-IT,it;q=0.9',
|
|
'Cache-Control': 'No-Cache',
|
|
'Origin': 'https://ec.europa.eu',
|
|
'Referer': 'https://ec.europa.eu/',
|
|
'Sec-Fetch-Dest': 'empty',
|
|
'Sec-Fetch-Mode': 'cors',
|
|
'Sec-Fetch-Site': 'same-site',
|
|
}
|
|
response = requests.get(url, headers=headers)
|
|
if response.status_code == 200:
|
|
return response.content
|
|
else:
|
|
return f"Error: {response.status_code} - {response.text}"
|
|
|
|
def create_sources_zip(pdf_paths: list, zip_path: str) -> str:
|
|
"""
|
|
Crea un archivio ZIP contenente i PDF delle fonti.
|
|
|
|
Args:
|
|
pdf_paths: lista dei percorsi dei PDF da includere
|
|
zip_path: percorso del file ZIP di output
|
|
|
|
Returns:
|
|
Percorso del file ZIP creato
|
|
"""
|
|
zip_dir = os.path.dirname(zip_path)
|
|
if zip_dir:
|
|
os.makedirs(zip_dir, exist_ok=True)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
for path in pdf_paths:
|
|
if os.path.exists(path):
|
|
zf.write(path, os.path.basename(path))
|
|
|
|
log.info(f"ZIP creato: {zip_path} ({len(pdf_paths)} file)")
|
|
return zip_path
|
|
|
|
|
|
|
|
|