Spaces:
Running
Running
File size: 6,900 Bytes
c3e4914 b08bc0e af9f47e c3e4914 b08bc0e c3e4914 b08bc0e c3e4914 af9f47e c3e4914 af9f47e c3e4914 419e73d c3e4914 b08bc0e c3e4914 af9f47e c3e4914 af9f47e c3e4914 af9f47e c3e4914 b08bc0e f95013c b08bc0e f95013c b08bc0e f95013c b08bc0e f95013c b08bc0e f95013c b08bc0e f95013c b08bc0e f95013c b08bc0e f95013c b08bc0e c3e4914 b08bc0e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | import io
import json
import os
import random
import tempfile
import uuid
from datetime import datetime, timezone
from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.staticfiles import StaticFiles
from PIL import Image, ImageOps
from .model import load_detector, predict_image
from .screenshot import preprocess
from .video import sample_frames
MAX_IMAGE_SIZE_MB = 50
MAX_VIDEO_SIZE_MB = 300
N_VIDEO_FRAMES = 5
IMAGE_TYPES = {"image/jpeg", "image/jpg", "image/png", "image/webp"}
VIDEO_TYPES = {"video/mp4", "video/quicktime", "video/webm", "video/x-matroska"}
HF_REPORT_REPO = os.environ.get("HF_REPORT_REPO", "ComplexDataLab/openfake-reports")
HF_TOKEN = os.environ.get("HF_TOKEN")
app = FastAPI(title="Deepfake Detector")
@app.on_event("startup")
def warmup():
load_detector()
def _predict_with_preprocess(image: Image.Image) -> dict:
"""Run the screenshot-aware prediction pipeline on a single image.
Returns a dict with p_fake, the preprocessing status, and the crop boxes
in the EXIF-rotated coordinate frame so the frontend can overlay them on
the user-visible image.
"""
# Apply EXIF rotation up front so crop_box coords and image_size are in
# the same frame as the browser-rendered image.
image = ImageOps.exif_transpose(image)
width, height = image.size
result = preprocess(image)
crop_box = None
if result.crop_box is not None:
boxes = result.crop_box if isinstance(result.crop_box, list) else [result.crop_box]
crop_box = [list(b) for b in boxes]
base = {
"preprocess_status": result.status,
"image_size": [width, height],
"crop_box": crop_box,
}
if result.status == "cropped":
crops = result.image if isinstance(result.image, list) else [result.image]
probs = [predict_image(c) for c in crops]
p_fake = sum(probs) / len(probs)
return {**base, "p_fake": p_fake, "n_crops": len(crops)}
if result.status == "text_only":
raw_p_fake = predict_image(image)
# The detector is unreliable on pure-text screenshots and tends to
# flag them as AI-generated. If it leans "AI", soften to uncertain;
# if it leans "real", keep the score.
if raw_p_fake > 0.5:
p_fake = random.uniform(0.4, 0.6)
else:
p_fake = raw_p_fake
return {**base, "p_fake": p_fake, "raw_p_fake": raw_p_fake}
p_fake = predict_image(image)
return {**base, "p_fake": p_fake}
@app.post("/api/predict")
async def predict(file: UploadFile = File(...)):
content_type = (file.content_type or "").lower()
raw = await file.read()
size_mb = len(raw) / (1024 * 1024)
if content_type in IMAGE_TYPES:
if size_mb > MAX_IMAGE_SIZE_MB:
raise HTTPException(413, f"Image exceeds {MAX_IMAGE_SIZE_MB} MB")
try:
image = Image.open(io.BytesIO(raw))
except Exception:
raise HTTPException(400, "Invalid image")
pred = _predict_with_preprocess(image)
p_fake = pred["p_fake"]
return {
"media_type": "image",
"p_fake": p_fake,
"reliability": 1.0 - p_fake,
"n_frames": 1,
**{k: v for k, v in pred.items() if k != "p_fake"},
}
if content_type in VIDEO_TYPES:
if size_mb > MAX_VIDEO_SIZE_MB:
raise HTTPException(413, f"Video exceeds {MAX_VIDEO_SIZE_MB} MB")
suffix = Path(file.filename or "video.mp4").suffix or ".mp4"
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp:
tmp.write(raw)
tmp_path = tmp.name
try:
frames = sample_frames(tmp_path, N_VIDEO_FRAMES)
except ValueError as e:
raise HTTPException(400, str(e))
finally:
try:
Path(tmp_path).unlink(missing_ok=True)
except Exception:
pass
probs = [predict_image(f) for f in frames]
p_fake = sum(probs) / len(probs)
return {
"media_type": "video",
"p_fake": p_fake,
"reliability": 1.0 - p_fake,
"n_frames": len(frames),
"frame_probs": probs,
}
raise HTTPException(415, f"Unsupported media type: {content_type}")
@app.post("/api/report")
async def report(
file: UploadFile = File(...),
is_real: str = Form(...),
reason: str = Form(...),
reason_other: str = Form(""),
reason_details: str = Form(""),
comment: str = Form(""),
p_fake: float = Form(...),
consent: str = Form(...),
):
"""Save an error report (form answers + media file) to a Hugging Face dataset repo."""
if consent != "true":
raise HTTPException(400, "Consent to save the file is required.")
if not HF_TOKEN:
raise HTTPException(
503, "Reporting is not configured (missing HF_TOKEN)."
)
# Read the uploaded file
raw = await file.read()
content_type = (file.content_type or "").lower()
if content_type not in IMAGE_TYPES | VIDEO_TYPES:
raise HTTPException(415, "Unsupported file type for reporting.")
# Build report payload
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
short_id = uuid.uuid4().hex[:8]
folder_name = f"{ts}_{short_id}"
report_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"is_real": is_real,
"reason": reason,
"reason_other": reason_other if reason == "other" else "",
"reason_details": reason_details,
"comment": comment,
"p_fake": p_fake,
"content_type": content_type,
"original_filename": file.filename or "unknown",
}
# Write to a temp directory then upload to HF
with tempfile.TemporaryDirectory() as tmpdir:
report_dir = Path(tmpdir) / folder_name
report_dir.mkdir()
# Save report JSON
(report_dir / "report.json").write_text(
json.dumps(report_data, indent=2, ensure_ascii=False)
)
# Save media file with original extension
ext = Path(file.filename or "file.bin").suffix or ".bin"
(report_dir / f"media{ext}").write_bytes(raw)
# Upload to HF dataset repo
try:
from huggingface_hub import HfApi
api = HfApi(token=HF_TOKEN)
api.upload_folder(
folder_path=str(report_dir),
path_in_repo=f"reports/{folder_name}",
repo_id=HF_REPORT_REPO,
repo_type="dataset",
)
except Exception as e:
raise HTTPException(500, f"Failed to upload report: {e}")
return {"status": "ok"}
static_dir = Path(__file__).parent / "static"
app.mount("/", StaticFiles(directory=str(static_dir), html=True), name="static")
|