Skip to main content

Overview

The persistence layer provides concrete implementations of repository interfaces for both CSV file storage and PostgreSQL database.

CSV Repositories

MovieCsvRepository

Persists movies to CSV files with thread-safe operations.

Class Definition

import csv
import threading
from typing import Optional
from domain.models.movie import Movie
from domain.repositories.movie_repository import MovieRepository

class MovieCsvRepository(MovieRepository):
    def __init__(self):
        os.makedirs(os.path.dirname(MOVIES_CSV), exist_ok=True)
        if not os.path.exists(MOVIES_CSV):
            with open(MOVIES_CSV, "w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f)
                writer.writerow(MOVIE_HEADERS)
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:12-21 File: data/movies.csv Headers: ["id", "imdb_id", "title", "year", "rating", "duration_minutes", "metascore"]

Methods

save
def save(self, movie: Movie) -> Movie
movie
Movie
required
Movie to save. ID auto-assigned if None.
return
Movie
Movie object with assigned ID.
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:34-54 Thread Safety: Uses movie_lock for atomic write operations.
find_by_imdb_id
def find_by_imdb_id(self, imdb_id: str) -> Optional[Movie]
imdb_id
str
required
IMDb ID to search for (e.g., tt0111161).
return
Optional[Movie]
Movie if found, otherwise None.
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:56-76
Performance: Linear search through entire file. Consider indexing for large datasets.

ActorCsvRepository

Persists actors to CSV files.

Class Definition

from domain.models.actor import Actor
from domain.repositories.actor_repository import ActorRepository

class ActorCsvRepository(ActorRepository):
    def __init__(self):
        os.makedirs(os.path.dirname(ACTORS_CSV), exist_ok=True)
        if not os.path.exists(ACTORS_CSV):
            with open(ACTORS_CSV, "w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f)
                writer.writerow(ACTOR_HEADERS)
Source: infrastructure/persistence/csv/repositories/actor_csv_repository.py:14-23 File: data/actors.csv Headers: ["id", "name"]

Methods

save
def save(self, actor: Actor) -> Actor
actor
Actor
required
Actor to save. ID auto-assigned if None.
return
Actor
Actor with assigned ID.
Source: infrastructure/persistence/csv/repositories/actor_csv_repository.py:36-49
find_by_name
def find_by_name(self, name: str) -> Optional[Actor]
name
str
required
Actor name to search for (case-insensitive).
return
Optional[Actor]
Actor if found, otherwise None.
Source: infrastructure/persistence/csv/repositories/actor_csv_repository.py:51-61

MovieActorCsvRepository

Persists movie-actor relationships to CSV.

Class Definition

from domain.models.movie_actor import MovieActor
from domain.repositories.movie_actor_repository import MovieActorRepository

class MovieActorCsvRepository(MovieActorRepository):
    def __init__(self):
        os.makedirs(os.path.dirname(MOVIE_ACTOR_CSV), exist_ok=True)
        if not os.path.exists(MOVIE_ACTOR_CSV):
            with open(MOVIE_ACTOR_CSV, "w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f)
                writer.writerow(MOVIE_ACTOR_HEADERS)
Source: infrastructure/persistence/csv/repositories/movie_actor_csv_repository.py:12-21 File: data/movie_actor.csv Headers: ["movie_id", "actor_id"]

Methods

save
def save(self, relation: MovieActor) -> None
relation
MovieActor
required
Single relationship to save.
Source: infrastructure/persistence/csv/repositories/movie_actor_csv_repository.py:23-30
save_many
def save_many(self, relations: List[MovieActor]) -> None
relations
List[MovieActor]
required
List of relationships to save in bulk.
Source: infrastructure/persistence/csv/repositories/movie_actor_csv_repository.py:32-40
Uses writerows() for efficient bulk inserts.

PostgreSQL Repositories

MoviePostgresRepository

Persists movies to PostgreSQL database.

Class Definition

from domain.models.movie import Movie
from domain.repositories.movie_repository import MovieRepository
from psycopg2 import DatabaseError

class MoviePostgresRepository(MovieRepository):
    def __init__(self, conn):
        self.conn = conn
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:9-14

Methods

save
Saves movie using stored procedure.
def save(self, movie: Movie) -> Movie
movie
Movie
required
Movie to insert or update.
return
Movie
Movie with database-assigned ID.
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:16-41 Database Procedure:
SELECT * FROM upsert_movie(
    imdb_id, title, year, rating, duration_minutes, metascore
)
find_by_imdb_id
def find_by_imdb_id(self, imdb_id: str) -> Optional[Movie]
imdb_id
str
required
IMDb ID to search for.
return
Optional[Movie]
Movie if found, otherwise None.
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:43-66 Query:
SELECT id, imdb_id, title, year, rating, duration_minutes, metascore 
FROM movies 
WHERE imdb_id = %s

ActorPostgresRepository

Persists actors to PostgreSQL.

Class Definition

from domain.models.actor import Actor
from domain.repositories.actor_repository import ActorRepository

class ActorPostgresRepository(ActorRepository):
    def __init__(self, conn):
        self.conn = conn
Source: infrastructure/persistence/postgres/repositories/actor_postgres_repository.py:9-11

Methods

save
def save(self, actor: Actor) -> Actor
actor
Actor
required
Actor to insert or update.
return
Actor
Actor with database-assigned ID.
Source: infrastructure/persistence/postgres/repositories/actor_postgres_repository.py:13-26 Database Procedure:
SELECT * FROM upsert_actor(name)
find_by_name
def find_by_name(self, name: str) -> Optional[Actor]
name
str
required
Actor name to search for.
return
Optional[Actor]
Actor if found, otherwise None.
Source: infrastructure/persistence/postgres/repositories/actor_postgres_repository.py:28-42

MovieActorPostgresRepository

Persists movie-actor relationships to PostgreSQL.

Class Definition

from domain.models.movie_actor import MovieActor
from domain.repositories.movie_actor_repository import MovieActorRepository

class MovieActorPostgresRepository(MovieActorRepository):
    def __init__(self, conn):
        self.conn = conn
Source: infrastructure/persistence/postgres/repositories/movie_actor_postgres_repository.py:10-15

Methods

save
def save(self, relation: MovieActor) -> None
relation
MovieActor
required
Single relationship to save.
Source: infrastructure/persistence/postgres/repositories/movie_actor_postgres_repository.py:17-29 Database Procedure:
SELECT * FROM upsert_movie_actor(movie_id, actor_id)
save_many
def save_many(self, relations: List[MovieActor]) -> None
relations
List[MovieActor]
required
List of relationships to save in bulk.
Source: infrastructure/persistence/postgres/repositories/movie_actor_postgres_repository.py:31-49

Thread Safety

CSV Repositories

Use threading locks for atomic operations:
movie_lock = threading.Lock()

def save(self, movie: Movie) -> Movie:
    with movie_lock:
        # ... atomic file operations
Source: infrastructure/persistence/csv/repositories/movie_csv_repository.py:10,39

PostgreSQL Repositories

Connection pooling recommended for concurrent access:
from psycopg2.pool import ThreadedConnectionPool

pool = ThreadedConnectionPool(
    minconn=1,
    maxconn=10,
    host="localhost",
    database="imdb_scraper",
    user="user",
    password="password"
)

conn = pool.getconn()
repo = MoviePostgresRepository(conn)

Error Handling

CSV Errors

try:
    saved_movie = movie_repo.save(movie)
except IOError as e:
    logger.error(f"File write error: {e}")
except Exception as e:
    logger.error(f"Unexpected error: {e}")

PostgreSQL Errors

from psycopg2 import DatabaseError

try:
    saved_movie = movie_repo.save(movie)
except DatabaseError as e:
    logger.error(f"Database error: {e}")
    conn.rollback()
    raise
Source: infrastructure/persistence/postgres/repositories/movie_postgres_repository.py:38-41

Usage Example

CSV Storage

from infrastructure.persistence.csv.repositories import (
    MovieCsvRepository,
    ActorCsvRepository,
    MovieActorCsvRepository
)
from domain.models.movie import Movie
from domain.models.actor import Actor
from domain.models.movie_actor import MovieActor

# Initialize repositories
movie_repo = MovieCsvRepository()
actor_repo = ActorCsvRepository()
relation_repo = MovieActorCsvRepository()

# Save movie
movie = Movie(
    id=None,
    imdb_id="tt0111161",
    title="The Shawshank Redemption",
    year=1994,
    rating=9.3,
    duration_minutes=142,
    metascore=82,
    actors=[]
)
saved_movie = movie_repo.save(movie)
print(f"Saved movie with ID: {saved_movie.id}")

# Save actors
actor = Actor(id=None, name="Tim Robbins")
saved_actor = actor_repo.save(actor)

# Save relationship
relation = MovieActor(
    movie_id=saved_movie.id,
    actor_id=saved_actor.id
)
relation_repo.save(relation)

PostgreSQL Storage

import psycopg2
from infrastructure.persistence.postgres.repositories import (
    MoviePostgresRepository,
    ActorPostgresRepository,
    MovieActorPostgresRepository
)

# Connect to database
conn = psycopg2.connect(
    host="localhost",
    database="imdb_scraper",
    user="user",
    password="password"
)

# Initialize repositories
movie_repo = MoviePostgresRepository(conn)
actor_repo = ActorPostgresRepository(conn)
relation_repo = MovieActorPostgresRepository(conn)

# Save movie (same API as CSV)
saved_movie = movie_repo.save(movie)
conn.commit()

# Close connection
conn.close()

Build docs developers (and LLMs) love