Overview
The persistence layer provides concrete implementations of repository interfaces for both CSV file storage and PostgreSQL database.
CSV Repositories
MovieCsvRepository
Persists movies to CSV files with thread-safe operations.
Class Definition
import csv
import threading
from typing import Optional
from domain.models.movie import Movie
from domain.repositories.movie_repository import MovieRepository
class MovieCsvRepository(MovieRepository):
def __init__(self):
os.makedirs(os.path.dirname(MOVIES_CSV), exist_ok=True)
if not os.path.exists(MOVIES_CSV):
with open(MOVIES_CSV, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(MOVIE_HEADERS)
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:12-21
File: data/movies.csv
Headers: ["id", "imdb_id", "title", "year", "rating", "duration_minutes", "metascore"]
Methods
save
def save(self, movie: Movie) -> Movie
Movie to save. ID auto-assigned if None.
Movie object with assigned ID.
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:34-54
Thread Safety: Uses movie_lock for atomic write operations.
find_by_imdb_id
def find_by_imdb_id(self, imdb_id: str) -> Optional[Movie]
IMDb ID to search for (e.g., tt0111161).
Movie if found, otherwise None.
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:56-76
Performance: Linear search through entire file. Consider indexing for large datasets.
ActorCsvRepository
Persists actors to CSV files.
Class Definition
from domain.models.actor import Actor
from domain.repositories.actor_repository import ActorRepository
class ActorCsvRepository(ActorRepository):
def __init__(self):
os.makedirs(os.path.dirname(ACTORS_CSV), exist_ok=True)
if not os.path.exists(ACTORS_CSV):
with open(ACTORS_CSV, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(ACTOR_HEADERS)
Source: infrastructure/persistence/csv/repositories/actor_csv_repository.py:14-23
File: data/actors.csv
Headers: ["id", "name"]
Methods
save
def save(self, actor: Actor) -> Actor
Actor to save. ID auto-assigned if None.
Source: infrastructure/persistence/csv/repositories/actor_csv_repository.py:36-49
find_by_name
def find_by_name(self, name: str) -> Optional[Actor]
Actor name to search for (case-insensitive).
Actor if found, otherwise None.
Source: infrastructure/persistence/csv/repositories/actor_csv_repository.py:51-61
MovieActorCsvRepository
Persists movie-actor relationships to CSV.
Class Definition
from domain.models.movie_actor import MovieActor
from domain.repositories.movie_actor_repository import MovieActorRepository
class MovieActorCsvRepository(MovieActorRepository):
def __init__(self):
os.makedirs(os.path.dirname(MOVIE_ACTOR_CSV), exist_ok=True)
if not os.path.exists(MOVIE_ACTOR_CSV):
with open(MOVIE_ACTOR_CSV, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(MOVIE_ACTOR_HEADERS)
Source: infrastructure/persistence/csv/repositories/movie_actor_csv_repository.py:12-21
File: data/movie_actor.csv
Headers: ["movie_id", "actor_id"]
Methods
save
def save(self, relation: MovieActor) -> None
Single relationship to save.
Source: infrastructure/persistence/csv/repositories/movie_actor_csv_repository.py:23-30
save_many
def save_many(self, relations: List[MovieActor]) -> None
List of relationships to save in bulk.
Source: infrastructure/persistence/csv/repositories/movie_actor_csv_repository.py:32-40
Uses writerows() for efficient bulk inserts.
PostgreSQL Repositories
MoviePostgresRepository
Persists movies to PostgreSQL database.
Class Definition
from domain.models.movie import Movie
from domain.repositories.movie_repository import MovieRepository
from psycopg2 import DatabaseError
class MoviePostgresRepository(MovieRepository):
def __init__(self, conn):
self.conn = conn
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:9-14
Methods
save
Saves movie using stored procedure.
def save(self, movie: Movie) -> Movie
Movie to insert or update.
Movie with database-assigned ID.
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:16-41
Database Procedure:
SELECT * FROM upsert_movie(
imdb_id, title, year, rating, duration_minutes, metascore
)
find_by_imdb_id
def find_by_imdb_id(self, imdb_id: str) -> Optional[Movie]
Movie if found, otherwise None.
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:43-66
Query:
SELECT id, imdb_id, title, year, rating, duration_minutes, metascore
FROM movies
WHERE imdb_id = %s
ActorPostgresRepository
Persists actors to PostgreSQL.
Class Definition
from domain.models.actor import Actor
from domain.repositories.actor_repository import ActorRepository
class ActorPostgresRepository(ActorRepository):
def __init__(self, conn):
self.conn = conn
Source: infrastructure/persistence/postgres/repositories/actor_postgres_repository.py:9-11
Methods
save
def save(self, actor: Actor) -> Actor
Actor to insert or update.
Actor with database-assigned ID.
Source: infrastructure/persistence/postgres/repositories/actor_postgres_repository.py:13-26
Database Procedure:
SELECT * FROM upsert_actor(name)
find_by_name
def find_by_name(self, name: str) -> Optional[Actor]
Actor name to search for.
Actor if found, otherwise None.
Source: infrastructure/persistence/postgres/repositories/actor_postgres_repository.py:28-42
MovieActorPostgresRepository
Persists movie-actor relationships to PostgreSQL.
Class Definition
from domain.models.movie_actor import MovieActor
from domain.repositories.movie_actor_repository import MovieActorRepository
class MovieActorPostgresRepository(MovieActorRepository):
def __init__(self, conn):
self.conn = conn
Source: infrastructure/persistence/postgres/repositories/movie_actor_postgres_repository.py:10-15
Methods
save
def save(self, relation: MovieActor) -> None
Single relationship to save.
Source: infrastructure/persistence/postgres/repositories/movie_actor_postgres_repository.py:17-29
Database Procedure:
SELECT * FROM upsert_movie_actor(movie_id, actor_id)
save_many
def save_many(self, relations: List[MovieActor]) -> None
List of relationships to save in bulk.
Source: infrastructure/persistence/postgres/repositories/movie_actor_postgres_repository.py:31-49
Thread Safety
CSV Repositories
Use threading locks for atomic operations:
movie_lock = threading.Lock()
def save(self, movie: Movie) -> Movie:
with movie_lock:
# ... atomic file operations
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:10,39
PostgreSQL Repositories
Connection pooling recommended for concurrent access:
from psycopg2.pool import ThreadedConnectionPool
pool = ThreadedConnectionPool(
minconn=1,
maxconn=10,
host="localhost",
database="imdb_scraper",
user="user",
password="password"
)
conn = pool.getconn()
repo = MoviePostgresRepository(conn)
Error Handling
CSV Errors
try:
saved_movie = movie_repo.save(movie)
except IOError as e:
logger.error(f"File write error: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
PostgreSQL Errors
from psycopg2 import DatabaseError
try:
saved_movie = movie_repo.save(movie)
except DatabaseError as e:
logger.error(f"Database error: {e}")
conn.rollback()
raise
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:38-41
Usage Example
CSV Storage
from infrastructure.persistence.csv.repositories import (
MovieCsvRepository,
ActorCsvRepository,
MovieActorCsvRepository
)
from domain.models.movie import Movie
from domain.models.actor import Actor
from domain.models.movie_actor import MovieActor
# Initialize repositories
movie_repo = MovieCsvRepository()
actor_repo = ActorCsvRepository()
relation_repo = MovieActorCsvRepository()
# Save movie
movie = Movie(
id=None,
imdb_id="tt0111161",
title="The Shawshank Redemption",
year=1994,
rating=9.3,
duration_minutes=142,
metascore=82,
actors=[]
)
saved_movie = movie_repo.save(movie)
print(f"Saved movie with ID: {saved_movie.id}")
# Save actors
actor = Actor(id=None, name="Tim Robbins")
saved_actor = actor_repo.save(actor)
# Save relationship
relation = MovieActor(
movie_id=saved_movie.id,
actor_id=saved_actor.id
)
relation_repo.save(relation)
PostgreSQL Storage
import psycopg2
from infrastructure.persistence.postgres.repositories import (
MoviePostgresRepository,
ActorPostgresRepository,
MovieActorPostgresRepository
)
# Connect to database
conn = psycopg2.connect(
host="localhost",
database="imdb_scraper",
user="user",
password="password"
)
# Initialize repositories
movie_repo = MoviePostgresRepository(conn)
actor_repo = ActorPostgresRepository(conn)
relation_repo = MovieActorPostgresRepository(conn)
# Save movie (same API as CSV)
saved_movie = movie_repo.save(movie)
conn.commit()
# Close connection
conn.close()