The following benchmarks are based on processing 3 months of data (January-March 2022) with approximately 10+ million taxi trip records.
Execution Time Breakdown
| Operation | Time (seconds) | Percentage | Description |
|---|
| Init objects | 0.005 | 0.01% | Creating YellowTaxiData instance and date ranges |
| Importing data | 7.456 | 14.1% | Downloading and reading parquet files from remote URLs |
| Cleaning data | 5.638 | 10.6% | Removing duplicates, nulls, and invalid records |
| Adding columns | 30.626 | 57.8% | Computing derived datetime columns (year, month, week) |
| Week metrics | 1.228 | 2.3% | Aggregating statistics by week |
| Month metrics | 7.939 | 15.0% | Aggregating statistics by month and rate code |
| Formatting | 0.001 | 0.0% | Rounding and resetting indexes |
| Exporting | 0.095 | 0.2% | Writing CSV and Excel files |
| Total | 52.988 | 100% | Full pipeline execution |
Raw Output
Init objects ...
*** 0.00547895899999995 seconds ***
Importing data ...
*** 7.456109084 seconds ***
Cleaning data ...
*** 5.637706417 seconds ***
Adding more columns ...
*** 30.625991959 seconds ***
Generating week metrics ...
*** 1.227824708 seconds ***
Generating month metrics ...
*** 7.938639625 seconds ***
Formatting results ...
*** 0.0005992090000006556 seconds ***
Exporting results ...
*** 0.09505266699999737 seconds ***
Execution time: 52.987694084 seconds
Bottleneck Analysis
Primary Bottleneck: Adding Columns (58% of total time)
The add_more_columns() method is the slowest operation in the pipeline:
def add_more_columns(self):
self.data['year_month'] = self.data['tpep_dropoff_datetime'].dt.strftime('%Y-%m')
self.data['year_dt'] = self.data['tpep_dropoff_datetime'].dt.year.astype(str)
self.data['week_dt'] = self.data['tpep_dropoff_datetime'].dt.isocalendar().week.astype(str).str.zfill(3)
self.data['year_week'] = self.data['year_dt'].str.cat(self.data['week_dt'], sep='-')
self.data['year_month_day'] = self.data['tpep_dropoff_datetime'].dt.strftime('%Y-%m-%d')
Why it’s slow:
- Multiple datetime accessor calls (
.dt.strftime(), .dt.year, .dt.isocalendar())
- String conversions and formatting operations
- Operates on millions of rows
.isocalendar() is particularly expensive
Secondary Bottleneck: Month Metrics (15% of total time)
The generate_month_metrics() method performs multiple groupby operations:
def generate_month_metrics(self):
# Loops through 3 rate code categories
# Each performs filtering and groupby aggregation
for rc_id in rate_code_id_dict.keys():
df = df.groupby(['year_month', 'day_type']).agg(...)
Why it’s moderately slow:
- Three separate groupby operations
- Data filtering and copying for each rate code
- Multiple aggregation functions per group
Optimization Techniques Used
1. PyArrow Engine for Parquet
The code uses engine='pyarrow' for reading parquet files:
pd.read_parquet(path=url, engine='pyarrow')
Benefits:
- Faster read performance than default pandas engine
- Better memory efficiency
- Optimized columnar data access
2. Pandas Indexing
Multi-column indexing is set up during import (main.py:35-36):
self.data.set_index(['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'RatecodeID'],
inplace=True, drop=False)
Benefits:
- Faster filtering operations
- Optimized groupby performance
- Efficient lookups
3. Column Filtering
Only necessary columns are loaded (main.py:33-34):
self.data = self.data[['tpep_pickup_datetime', 'tpep_dropoff_datetime',
'passenger_count', 'trip_distance', 'RatecodeID', 'total_amount']]
Benefits:
- Reduced memory footprint
- Faster operations on smaller DataFrames
- Less data to serialize/deserialize
4. Vectorized Operations
All data cleaning uses pandas vectorized operations instead of loops:
# Vectorized filtering - processes all rows at once
self.data = self.data[
(self.data['tpep_pickup_datetime'] >= self.start_date) &
(self.data['tpep_dropoff_datetime'] <= self.end_date)
]
Memory Considerations
Current Dataset (3 months)
- Records: ~10 million rows
- Columns: 6 core columns + 5 derived columns
- Estimated memory: 1-2 GB in memory
For Larger Date Ranges
When processing more than 6 months of data, consider implementing chunking strategies:
# Process data in monthly chunks
for month_url in self.urls_list:
chunk = pd.read_parquet(month_url)
# Process chunk
# Aggregate results
# Release memory
del chunk
Memory scaling estimates:
- 6 months: ~4 GB
- 1 year: ~8 GB
- 2 years: ~16 GB
Optimization Opportunities
1. Optimize Column Addition (High Impact)
Potential speedup: 40-50% of total execution time
Current approach creates columns one at a time:
# Current (slow)
self.data['year_dt'] = self.data['tpep_dropoff_datetime'].dt.year.astype(str)
self.data['week_dt'] = self.data['tpep_dropoff_datetime'].dt.isocalendar().week.astype(str).str.zfill(3)
Optimized approach using pd.to_datetime vectorization:
# Optimized - extract date components once
dt = self.data['tpep_dropoff_datetime']
self.data['year_month'] = dt.dt.to_period('M').astype(str)
self.data['year_week'] = dt.dt.strftime('%Y-%U') # Faster than isocalendar()
Key improvements:
- Use
strftime('%Y-%U') instead of isocalendar() for week numbers
- Extract datetime components in a single pass
- Avoid intermediate string conversions where possible
2. Parallel Processing (Medium Impact)
Potential speedup: 20-30% with multi-core processing
Import multiple parquet files in parallel:
from concurrent.futures import ThreadPoolExecutor
def import_data_parallel(self):
with ThreadPoolExecutor(max_workers=4) as executor:
dataframes_list = list(executor.map(pd.read_parquet, self.urls_list))
self.data = pd.concat(dataframes_list, ignore_index=True)
3. Use Dask for Very Large Datasets (High Impact for 1+ years)
For datasets exceeding available RAM, consider Dask
import dask.dataframe as dd
# Process larger-than-memory datasets
ddf = dd.read_parquet(self.urls_list)
result = ddf.groupby('year_week').agg({...}).compute()
Benefits:
- Out-of-core processing (handles data larger than RAM)
- Automatic parallelization
- Pandas-like API
Save cleaned data to avoid reprocessing:
# After cleaning, save to parquet
self.data.to_parquet('cleaned_data.parquet')
# On subsequent runs, load from cache if available
if os.path.exists('cleaned_data.parquet'):
self.data = pd.read_parquet('cleaned_data.parquet')
else:
self.import_data()
self.clean_data()
For Faster Development Iterations
-
Use a single month during development:
yellow_taxi_data = YellowTaxiData(start_date='2022-01-01', end_date='2022-01-31')
-
Profile specific operations with
time.perf_counter():
start = time.perf_counter()
# Your code here
print(f"Took {time.perf_counter() - start:.3f}s")
-
Use line profiler for detailed analysis:
pip install line_profiler
kernprof -l -v main.py
For Production Processing
- Use PyArrow for all parquet operations (already implemented)
- Process data in chunks for datasets larger than 6 months
- Run on machines with sufficient RAM (16GB+ recommended for 1 year of data)
- Consider cloud processing for multi-year analyses (AWS EMR, Google Dataproc)
The current implementation includes timing output for each stage. To monitor:
python main.py | tee performance.log
This creates a log file with all timing information for performance tracking over time.
Next Steps
For information about testing the performance-optimized code, see Testing.