The backend is a multi-threaded Python HTTP server that serves static files, provides REST API endpoints, and streams content from Google Drive. Built with standard library components, it requires no external web framework.
if self.path == "/api/bootstrap": refresh_cache_if_changed() with cache_lock: raw_bytes = bootstrap_cache_json_bytes or _payload_to_bytes(_empty_courses_payload()) gzip_bytes = bootstrap_cache_json_gzip_bytes or None self._send_json_bytes(200, raw_bytes, gzip_bytes) return
The bootstrap endpoint returns pre-serialized and pre-compressed JSON from memory. No JSON encoding happens per-request, ensuring sub-millisecond response times.
def _ref_matches(ref_value, idx, candidates): normalized = str(ref_value or "").strip().lower() if not normalized: return False if normalized == str(idx).lower(): return True # Match by index for candidate in candidates: c = str(candidate or "").strip().lower() if c and normalized == c: return True # Match by ID, name, or slug return False
if self.path == "/api/progress": try: if os.path.exists(PROGRESS_FILE): with open(PROGRESS_FILE, "r", encoding="utf-8") as f: data = json.load(f) else: data = {} except: data = {} self._send_json(200, data) return
From server.py:218-244 (POST):
if self.path == "/api/progress": content_length = int(self.headers.get("Content-Length", 0)) if content_length <= 0 or content_length > MAX_PROGRESS_BYTES: self._send_json(413, {"error": "payload_too_large"}) return post_data = self.rfile.read(content_length) try: # Validar que es JSON válido parsed = json.loads(post_data.decode("utf-8")) if not isinstance(parsed, dict): raise ValueError("progress payload must be a JSON object") # Guardar en archivo progress_dir = os.path.dirname(PROGRESS_FILE) if progress_dir: os.makedirs(progress_dir, exist_ok=True) with open(PROGRESS_FILE, "wb") as f: f.write(post_data) self._send_json(200, {"status": "saved"}) except Exception as e: self._send_json(400, {"error": str(e)}) return
Progress is stored as a raw JSON file (progress.json) with a 2MB size limit to prevent abuse. The server validates JSON structure but doesn’t interpret the content.
For browsers/devices with codec issues, the server can remux video through FFmpeg:From server.py:947-1122:
if self.path.startswith("/api/video-compatible/"): file_id = unquote(self.path[len("/api/video-compatible/"):]) ffmpeg_executable = _get_ffmpeg_executable() if not ffmpeg_executable: self._safe_send_error(503, "ffmpeg_not_available") return # Route through local /drive/files endpoint for auth source_url = f"http://127.0.0.1:{PORT}/drive/files/{file_id}" compat_force_reencode = os.environ.get("PLATZI_COMPAT_FORCE_REENCODE", "0").strip() == "1" ffmpeg_cmd = [ ffmpeg_executable, "-hide_banner", "-loglevel", "error", "-fflags", "+genpts+discardcorrupt", "-avoid_negative_ts", "make_zero", "-i", source_url, "-map", "0:v:0", "-map", "0:a?", ] if compat_force_reencode: ffmpeg_cmd.extend(["-c:v", "libx264", "-preset", "veryfast"]) else: ffmpeg_cmd.extend(["-c:v", "copy"]) # Stream copy (fast) ffmpeg_cmd.extend([ "-c:a", "aac", "-ar", "48000", "-af", "aresample=async=1:first_pts=0", "-movflags", "+frag_keyframe+empty_moov+default_base_moof", "-f", "mp4", "-", ]) process = subprocess.Popen( ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, ) self.send_response(200) self.send_header("Content-Type", "video/mp4") self.send_header("Cache-Control", "no-store, max-age=0") self.send_header("Accept-Ranges", "none") # Can't seek in live transcode self.end_headers() while True: chunk = process.stdout.read(1024 * 512) if not chunk: break self.wfile.write(chunk) total_bytes += len(chunk)
FFmpeg mode is not seekable (Accept-Ranges: none) because the stream is transcoded on-the-fly. Use this only when direct streaming fails due to codec issues.
def refresh_cache_if_changed(): """Recarga el caché si courses_cache.json cambió en disco.""" current_cache_file = resolve_cache_file_path() if not os.path.exists(current_cache_file): return try: current_mtime = os.path.getmtime(current_cache_file) except OSError: return with cache_lock: previous_mtime = cache_mtime previous_cache_file = cache_file_path if previous_cache_file != current_cache_file: with cache_reload_lock: with cache_lock: if cache_file_path != current_cache_file: print("[INFO] Cambio de origen de caché detectado, recargando...") init_cache() return if previous_mtime is not None and current_mtime <= previous_mtime: return with cache_reload_lock: with cache_lock: latest_mtime = cache_mtime if latest_mtime is not None and current_mtime <= latest_mtime: return print("[INFO] Detectado cambio en courses_cache.json, recargando caché...") init_cache()
Every API request calls refresh_cache_if_changed() to detect file updates. This enables hot-reloading when you rebuild the cache with rebuild_cache_drive.py.
def _get_session(self): # Shared session avoids cold-start latency on each per-request thread. if self._shared_session is None: with self._shared_session_lock: if self._shared_session is None: session = AuthorizedSession(self.creds) session.headers.update({"Accept-Encoding": "identity"}) self._shared_session = session return self._shared_session
requests.Session is mostly thread-safe for reads, but modifying headers isn’t. We set headers once during initialization and reuse the session across threads.
def _safe_send_error(self, code, message): try: self.send_error(code, message) except OSError as error: if not self._is_client_disconnect_error(error): raise