
Build Your First ETL Pipeline with PySpark – Dataquest
You’ve learned PySpark basics: RDDs, DataFrames, maybe some SQL queries. You can transform data and run aggregations in notebooks. But here’s the thing: data engineering is about building pipelines that run reliably every single day, handling the messy reality of production data.
Today, we’re building a complete ETL pipeline from scratch. This pipeline will handle the chaos you’ll actually encounter at work: inconsistent date formats, prices with dollar signs, test data that somehow made it to production, and customer IDs that follow different naming conventions.
Here’s the scenario: You just started as a junior data engineer at an online grocery delivery service. Your team lead drops by your desk with a problem. “Hey, we need help. Our daily sales report is a mess. The data comes in as CSVs from three different systems, nothing matches up, and the analyst team is doing everything manually in Excel. Can you build us an ETL pipeline?”
She shows you what she’s dealing with:
- Order files that need standardized date formatting
- Product prices stored as “$12.99” in some files, “12.99” in others
- Customer IDs that are sometimes numbers, sometimes start with “CUST_”
- Random blank rows and test data mixed in (“TEST ORDER – PLEASE IGNORE”)
“Just get it into clean CSV files,” she says. “We’ll worry about performance and parquet later. We just need something that works.”
Your mission? Build an ETL pipeline that takes this mess and turns it into clean, reliable data the analytics team can actually use. No fancy optimizations needed, just something that runs every morning without breaking.
Setting Up Your First ETL Project
Let’s start with structure. One of the biggest mistakes new data engineers make is jumping straight into writing transformation code without thinking about organization. You end up with a single massive Python file that’s impossible to debug, test, or explain to your team.
We’re going to build this the way professionals do it, but keep it simple enough that you won’t get lost in abstractions.
Project Structure That Makes Sense
Here’s what we’re creating:
grocery_etl/
├── data/
│ ├── raw/ # Your messy input CSVs
│ ├── processed/ # Clean output files
├── src/
│ └── etl_pipeline.py
├── main.py
└── requirements.txt
Why this structure? Three reasons:
First, it separates concerns. Your main.py
handles orchestration; starting Spark, calling functions, handling errors. Your src/etl_pipeline.py
contains all the actual ETL logic. When something breaks, you’ll know exactly where to look.
Second, it mirrors the organizational pattern you’ll use in production pipelines (even though the specifics will differ). Whether you’re deploying to Databricks, AWS EMR, or anywhere else, you’ll separate concerns the same way: orchestration code (main.py
), ETL logic (src/etl_pipeline.py
), and clear data boundaries. The actual file paths will change (e.g., production uses distributed filesystems like s3://data-lake/raw/
or /mnt/efs/raw/
instead of local folders), but the structure scales.
Third, it keeps your local development organized. Raw data stays raw. Processed data goes to a different folder. This makes debugging easier and mirrors the input/output separation you’ll have in production, just on your local machine.
Ready to start? Get the sample CSV files and project skeleton from our starter repository. You can either:
# Clone the full tutorials repo and navigate to this project
git clone https://github.com/dataquestio/tutorials.git
cd tutorials/pyspark-etl-tutorial
Or download just the pyspark-etl-tutorial
folder as a ZIP from the GitHub page.
Getting Started
We’ll build this project in two files:
- src/etl_pipeline.py: All our ETL functions (extract, transform, load)
- main.py: Orchestration logic that calls those functions
Let’s set up the basics. You’ll need Python 3.9+ and Java 11 or 17 installed (required for Spark 4.0). Note: In production, you’d match your PySpark version to whatever your cluster is running (Databricks, EMR, etc.).
# requirements.txt
pyspark==4.0.1
# main.py - Just the skeleton for now
from pyspark.sql import SparkSession
import logging
import sys
def main():
# We'll complete this orchestration logic later
pass
if __name__ == "__main__":
main()
That’s it for setup. Notice we’re not installing dozens of dependencies or configuring complex build tools. We’re keeping it minimal because the goal is to understand ETL patterns, not fight with tooling.
Optional: Interactive Data Exploration
Before we dive into writing pipeline code, you might want to poke around the data interactively. This is completely optional. If you prefer to jump straight into building, skip to the next section, but if you want to see what you’re up against, fire up the PySpark shell:
pyspark
Now you can explore interactively from the command line:
df = spark.read.csv("data/raw/online_orders.csv", header=True)
# See the data firsthand
df.show(5, truncate=False)
df.printSchema()
df.describe().show()
# Count how many weird values we have
df.filter(df.price.contains("$")).count()
df.filter(df.customer_id.contains("TEST")).count()
This exploration helps you understand what cleaning you’ll need to build into your pipeline. Real data engineers do this all the time: you load a sample, poke around, discover the problems, then write code to fix them systematically.
But interactive exploration is for understanding the data. The actual pipeline needs to be scripted, testable, and able to run without you babysitting it. That’s what we’re building next.
Extract: Getting Data Flexibly
The Extract phase is where most beginner ETL pipelines break. You write code that works perfectly with your test file, then the next day’s data arrives with a slightly different format, and everything crashes.
We’re going to read CSVs the defensive way: assume everything will go wrong, capture the problems, and keep the pipeline running.
Reading Messy CSV Files
Let’s start building src/etl_pipeline.py
. We’ll begin with imports and a function to create our Spark session:
# src/etl_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import logging
# Set up logger for this module
logger = logging.getLogger(__name__)
def create_spark_session():
"""Create a Spark session for our ETL job"""
return SparkSession.builder \
.appName("Grocery_Daily_ETL") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
This is a basic local configuration. Real production pipelines need more: time zone handling, memory allocation tuned to your cluster, policies for parsing dates, which we’ll cover in a future tutorial on production deployment. For now, we’re focusing on the pattern.
If you’re new to the logging
module, logger.info()
writes to log files with timestamps and severity levels. When something breaks, you can check the logs to see exactly what happened. It’s a small habit that saves debugging time.
Now let’s read the data:
def extract_sales_data(spark, input_path):
"""Read sales CSVs with all their real-world messiness"""
logger.info(f"Reading sales data from {input_path}")
expected_schema = StructType([
StructField("order_id", StringType(), True),
StructField("customer_id", StringType(), True),
StructField("product_name", StringType(), True),
StructField("price", StringType(), True),
StructField("quantity", StringType(), True),
StructField("order_date", StringType(), True),
StructField("region", StringType(), True)
])
StructType
and StructField
let you define exactly what columns you expect and what data types they should have. The True
at the end means the field can be null. You could let Spark infer the schema automatically, but explicit schemas catch problems earlier. If someone adds a surprise column next week, you’ll know immediately instead of discovering it three steps downstream.
Notice everything is StringType()
. You might think “wait, customer_id has numbers, shouldn’t that be IntegerType?” Here’s the thing: some customer IDs are “12345” and some are “CUST_12345”. If we used IntegerType()
, Spark would convert “CUST_12345” to null and we’d lose data.
The strategy is simple: prevent data loss by preserving everything as strings in the Extract phase, then clean and convert in the Transform phase, where we have control over error handling.
Now let’s read the file defensively:
df = spark.read.csv(
input_path,
header=True,
schema=expected_schema,
mode="PERMISSIVE"
)
total_records = df.count()
logger.info(f"Found {total_records} total records")
return df
The PERMISSIVE
mode tells Spark to be lenient with malformed data. When it encounters rows that don’t match the schema, it sets unparseable fields to null instead of crashing the entire job. This keeps production pipelines running even when data quality takes a hit. We’ll validate and handle data quality issues in the Transform phase, where we have better control.
Dealing with Multiple Files
Real data comes from multiple systems. Let’s combine them:
def extract_all_data(spark):
"""Combine data from multiple sources"""
# Each system exports differently
online_orders = extract_sales_data(spark, "data/raw/online_orders.csv")
store_orders = extract_sales_data(spark, "data/raw/store_orders.csv")
mobile_orders = extract_sales_data(spark, "data/raw/mobile_orders.csv")
# Union them all together
all_orders = online_orders.unionByName(store_orders).unionByName(mobile_orders)
logger.info(f"Combined dataset has {all_orders.count()} orders")
return all_orders
In production, you’d often use wildcards like "data/raw/online_orders*.csv"
to process multiple files at once (like daily exports). Spark reads them all and combines them automatically. We’re keeping it simple here with one file per source.
The .unionByName()
method stacks DataFrames vertically, matching columns by name rather than position. This prevents silent data corruption if schemas don’t match perfectly, which is a common issue when combining data from different systems. Since we defined the same schema for all three sources, this works cleanly.
You’ve now built the Extract phase: reading data defensively and combining multiple sources. The data isn’t clean yet, but at least we didn’t lose any of it. That’s what matters in Extract.
Transform: Fixing the Data Issues
This is where the real work happens. You’ve got all your data loaded, good and bad separated. Now we need to turn those messy strings into clean, usable data types.
The Transform phase is where you fix all the problems you discovered during extraction. Each transformation function handles one specific issue, making the code easier to test and debug.
Standardizing Customer IDs
Remember how customer IDs come in two formats? Some are just numbers, some have the “CUST_” prefix. Let’s standardize them:
# src/etl_pipeline.py (continuing in same file)
def clean_customer_id(df):
"""Standardize customer IDs (some are numbers, some are CUST_123 format)"""
df_cleaned = df.withColumn(
"customer_id_cleaned",
when(col("customer_id").startswith("CUST_"), col("customer_id"))
.when(col("customer_id").rlike("^[0-9]+$"), concat(lit("CUST_"), col("customer_id")))
.otherwise(col("customer_id"))
)
return df_cleaned.drop("customer_id").withColumnRenamed("customer_id_cleaned", "customer_id")
The logic here: if it already starts with “CUST_”, keep it. If it’s just numbers (rlike("^[0-9]+$")
checks for that), add the “CUST_” prefix. Everything else stays as-is for now. This gives us a consistent format to work with downstream.
Cleaning Price Data
Prices are often messy. Dollar signs, commas, who knows what else:
# src/etl_pipeline.py (continuing in same file)
def clean_price_column(df):
"""Fix the price column"""
# Remove dollar signs, commas, etc. (keep digits, decimals, and negatives)
df_cleaned = df.withColumn(
"price_cleaned",
regexp_replace(col("price"), r"[^0-9.\-]", "")
)
# Convert to decimal, default to 0 if it fails
df_final = df_cleaned.withColumn(
"price_decimal",
when(col("price_cleaned").isNotNull(),
col("price_cleaned").cast(DoubleType()))
.otherwise(0.0)
)
# Flag suspicious values for review
df_flagged = df_final.withColumn(
"price_quality_flag",
when(col("price_decimal") == 0.0, "CHECK_ZERO_PRICE")
.when(col("price_decimal") > 1000, "CHECK_HIGH_PRICE")
.when(col("price_decimal") < 0, "CHECK_NEGATIVE_PRICE")
.otherwise("OK")
)
bad_price_count = df_flagged.filter(col("price_quality_flag") != "OK").count()
logger.warning(f"Found {bad_price_count} orders with suspicious prices")
return df_flagged.drop("price", "price_cleaned")
The regexp_replace
function strips out everything that isn’t a digit or decimal point. Then we convert to a proper decimal type. The quality flag column helps us track suspicious values without throwing them out. This is important: we’re not perfect at cleaning, so we flag problems for humans to review later.
Note that we’re assuming US price format here (periods as decimal separators). European formats with commas would need different logic, but for this tutorial, we’re keeping it focused on the ETL pattern rather than international number handling.
Standardizing Dates
Date parsing is one of those things that looks simple but gets complicated fast. Different systems export dates in different formats: some use MM/dd/yyyy
, others use dd-MM-yyyy
, and ISO standard is yyyy-MM-dd
.
def standardize_dates(df):
"""Parse dates in multiple common formats"""
# Try each format - coalesce returns the first non-null result
fmt1 = to_date(col("order_date"), "yyyy-MM-dd")
fmt2 = to_date(col("order_date"), "MM/dd/yyyy")
fmt3 = to_date(col("order_date"), "dd-MM-yyyy")
df_parsed = df.withColumn(
"order_date_parsed",
coalesce(fmt1, fmt2, fmt3)
)
# Check how many we couldn't parse
unparsed = df_parsed.filter(col("order_date_parsed").isNull()).count()
if unparsed > 0:
logger.warning(f"Could not parse {unparsed} dates")
return df_parsed.drop("order_date")
We use coalesce()
to try each format in order, taking the first one that successfully parses. This handles the most common date format variations you’ll encounter.
Note: This approach works for simple date strings but doesn’t handle datetime strings with times or timezones. For production systems dealing with international data or precise timestamps, you’d need more sophisticated parsing logic. For now, we’re focusing on the core pattern.
Removing Test Data
Test data in production is inevitable. Let’s filter it out:
# src/etl_pipeline.py (continuing in same file)
def remove_test_data(df):
"""Remove test orders that somehow made it to production"""
df_filtered = df.filter(
~(upper(col("customer_id")).contains("TEST") |
upper(col("product_name")).contains("TEST") |
col("customer_id").isNull() |
col("order_id").isNull())
)
removed_count = df.count() - df_filtered.count()
logger.info(f"Removed {removed_count} test/invalid orders")
return df_filtered
We’re checking for “TEST” in customer IDs and product names, plus filtering out any rows with null order IDs or customer IDs. That tilde (~
) means “not”, so we’re keeping everything that doesn’t match these patterns.
Handling Duplicates
Sometimes the same order appears multiple times, usually from system retries:
# src/etl_pipeline.py (continuing in same file)
def handle_duplicates(df):
"""Remove duplicate orders (usually from retries)"""
df_deduped = df.dropDuplicates(["order_id"])
duplicate_count = df.count() - df_deduped.count()
if duplicate_count > 0:
logger.info(f"Removed {duplicate_count} duplicate orders")
return df_deduped
We keep the first occurrence of each order_id
and drop the rest. Simple and effective.
Bringing It All Together
Now we chain all these transformations in sequence:
# src/etl_pipeline.py (continuing in same file)
def transform_orders(df):
"""Apply all transformations in sequence"""
logger.info("Starting data transformation...")
# Clean each aspect of the data
df = clean_customer_id(df)
df = clean_price_column(df)
df = standardize_dates(df)
df = remove_test_data(df)
df = handle_duplicates(df)
# Cast quantity to integer
df = df.withColumn(
"quantity",
when(col("quantity").isNotNull(), col("quantity").cast(IntegerType()))
.otherwise(1)
)
# Add some useful calculated fields
df = df.withColumn("total_amount", col("price_decimal") * col("quantity")) \
.withColumn("processing_date", current_date()) \
.withColumn("year", year(col("order_date_parsed"))) \
.withColumn("month", month(col("order_date_parsed")))
# Rename for clarity
df = df.withColumnRenamed("order_date_parsed", "order_date") \
.withColumnRenamed("price_decimal", "unit_price")
logger.info(f"Transformation complete. Final record count: {df.count()}")
return df
Each transformation returns a new DataFrame (remember, PySpark DataFrames are immutable), so we reassign the result back to df
each time. The order matters here: we clean customer IDs before removing test data because the test removal logic checks for “TEST” in customer IDs. We standardize dates before extracting year and month because those extraction functions need properly parsed dates to work. If you swap the order around, transformations can fail or produce wrong results.
We also add some calculated fields that will be useful for analysis: total_amount
(price times quantity), processing_date
(when this ETL ran), and time partitions (year
and month
) for efficient querying later.
The data is now clean, typed correctly, and enriched with useful fields. Time to save it.
Load: Saving Your Work
The Load phase is when we write the cleaned data somewhere useful. We’re using pandas to write the final CSV because it avoids platform-specific issues during local development. In production on a real Spark cluster, you’d use Spark’s native writers for parquet format with partitioning for better performance. For now, we’re focusing on getting the pipeline working reliably across different development environments. You can always swap the output format to parquet once you deploy to a production cluster.
Writing Clean Files
Let’s write our data in a way that makes future queries fast:
# src/etl_pipeline.py (continuing in same file)
def load_to_csv(spark, df, output_path):
"""Save processed data for downstream use"""
logger.info(f"Writing {df.count()} records to {output_path}")
# Convert to pandas for local development ONLY (not suitable for large datasets)
pandas_df = df.toPandas()
# Create output directory if needed
import os
os.makedirs(output_path, exist_ok=True)
output_file = f"{output_path}/orders.csv"
pandas_df.to_csv(output_file, index=False)
logger.info(f"Successfully wrote {len(pandas_df)} records")
logger.info(f"Output location: {output_file}")
return len(pandas_df)
Important: The .toPandas()
method collects all distributed data into the driver’s memory. This is dangerous for real production data! If your dataset is larger than your driver’s RAM, your job will crash. We’re using this approach only because:
- Our tutorial dataset is tiny (85 rows)
- It avoids platform-specific Spark/Hadoop setup issues on Windows
- The focus is on learning ETL patterns, not deployment
In production, always use Spark’s native writers (df.write.parquet()
, df.write.csv()
) even though they require proper cluster configuration. Never use .toPandas()
for datasets larger than a few thousand rows or anything you wouldn’t comfortably fit in a single machine’s memory.
Quick Validation with Spark SQL
Before we call it done, let’s verify our data makes sense. This is where Spark SQL comes in handy:
# src/etl_pipeline.py (continuing in same file)
def sanity_check_data(spark, output_path):
"""Quick validation using Spark SQL"""
# Read the CSV file back
output_file = f"{output_path}/orders.csv"
df = spark.read.csv(output_file, header=True, inferSchema=True)
df.createOrReplaceTempView("orders")
# Run some quick validation queries
total_count = spark.sql("SELECT COUNT(*) as total FROM orders").collect()[0]['total']
logger.info(f"Sanity check - Total orders: {total_count}")
# Check for any suspicious data that slipped through
zero_price_count = spark.sql("""
SELECT COUNT(*) as zero_prices
FROM orders
WHERE unit_price = 0
""").collect()[0]['zero_prices']
if zero_price_count > 0:
logger.warning(f"Found {zero_price_count} orders with zero price")
# Verify date ranges make sense
date_range = spark.sql("""
SELECT
MIN(order_date) as earliest,
MAX(order_date) as latest
FROM orders
""").collect()[0]
logger.info(f"Date range: {date_range['earliest']} to {date_range['latest']}")
return True
The createOrReplaceTempView()
lets us query the DataFrame using SQL. This is useful for validation because SQL is often clearer for these kinds of checks than chaining DataFrame operations. We’re checking the record count, looking for zero prices that might indicate cleaning issues, and verifying the date range looks reasonable.
Creating a Summary Report
Your team lead is going to ask, “How’d the ETL go today?” Let’s give her the answer automatically:
# src/etl_pipeline.py (continuing in same file)
def create_summary_report(df):
"""Generate metrics about the ETL run"""
summary = {
"total_orders": df.count(),
"unique_customers": df.select("customer_id").distinct().count(),
"unique_products": df.select("product_name").distinct().count(),
"total_revenue": df.agg(sum("total_amount")).collect()[0][0],
"date_range": f"{df.agg(min('order_date')).collect()[0][0]} to {df.agg(max('order_date')).collect()[0][0]}",
"regions": df.select("region").distinct().count()
}
logger.info("\n=== ETL Summary Report ===")
for key, value in summary.items():
logger.info(f"{key}: {value}")
logger.info("========================\n")
return summary
This generates a quick summary of what got processed. In a real production system, you might email this summary or post it to Slack so the team knows the pipeline ran successfully.
One note about performance: this summary triggers multiple separate actions on the DataFrame. Each .count()
and .distinct().count()
scans the data independently, which isn’t optimized. We could compute all these metrics in a single pass, but that’s a topic for a future tutorial on performance optimization. Right now, we’re prioritizing readable code that works.
Putting It All Together
We’ve built all the pieces. Now let’s wire them up into a complete pipeline that runs from start to finish.
Remember how we set up main.py
as just a skeleton? Time to fill it in. This file orchestrates everything: starting Spark, calling our ETL functions in order, handling errors, and cleaning up when we’re done.
The Complete Pipeline
# main.py
from pyspark.sql import SparkSession
import logging
import sys
import traceback
from datetime import datetime
import os
# Import our ETL functions
from src.etl_pipeline import *
def setup_logging():
"""Basic logging setup"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'logs/etl_run_{datetime.now().strftime("%Y%m%d")}.log'),
logging.StreamHandler(sys.stdout)
]
)
return logging.getLogger(__name__)
def main():
"""Main ETL pipeline"""
# Create necessary directories
os.makedirs('logs', exist_ok=True)
os.makedirs('data/processed/orders', exist_ok=True)
logger = setup_logging()
logger.info("Starting Grocery ETL Pipeline")
# Track runtime
start_time = datetime.now()
try:
# Initialize Spark
spark = create_spark_session()
logger.info("Spark session created")
# Extract
raw_df = extract_all_data(spark)
logger.info(f"Extracted {raw_df.count()} raw records")
# Transform
clean_df = transform_orders(raw_df)
logger.info(f"Transformed to {clean_df.count()} clean records")
# Load
output_path = "data/processed/orders"
load_to_csv(spark, clean_df, output_path)
# Sanity check
sanity_check_data(spark, output_path)
# Create summary
summary = create_summary_report(clean_df)
# Calculate runtime
runtime = (datetime.now() - start_time).total_seconds()
logger.info(f"Pipeline completed successfully in {runtime:.2f} seconds")
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
logger.error(traceback.format_exc())
raise
finally:
spark.stop()
logger.info("Spark session closed")
if __name__ == "__main__":
main()
Let’s walk through what’s happening here.
The setup_logging()
function configures logging to write to both a file and the console. The log file gets named with today’s date, so you’ll have a history of every pipeline run. This is invaluable when you’re debugging issues that happened last Tuesday.
The main function wraps everything in a try-except-finally block, which is important for production pipelines. The try
block runs your ETL logic. If anything fails, the except
block logs the error with a full traceback (that traceback.format_exc()
is especially helpful when Spark’s Java stack traces get messy). The finally
block ensures we always close the Spark session, even if something crashed.
Notice we’re using relative paths like “data/processed/orders”. This is fine for local development but brittle in production. Real pipelines use environment variables or configuration files for paths. We’ll cover that in a future tutorial on production deployment.
Running Your Pipeline
With everything in place, you can run your pipeline with spark-submit
:
# Basic run
spark-submit main.py
# With more memory for bigger datasets
spark-submit --driver-memory 4g main.py
# See what's happening with Spark's adaptive execution
spark-submit --conf spark.sql.adaptive.enabled=true main.py
The first time you run this, you’ll probably hit some issues, but that’s completely normal. Let’s talk about the most common ones.
Common Issues You’ll Hit
No ETL pipeline works perfectly on the first try. Here are the problems everyone runs into and how to fix them.
Memory Errors
If you see java.lang.OutOfMemoryError
, Spark ran out of memory. Since we’re using .toPandas()
to write our output, this most commonly happens if your cleaned dataset is too large to fit in the driver’s memory:
# Option 1: Increase driver memory
spark-submit --driver-memory 4g main.py
# Option 2: Sample the data first to verify the pipeline works
df.sample(0.1).toPandas() # Process 10% to test
# Option 3: Switch to Spark's native CSV writer for large data
df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
For local development with reasonably-sized data, increasing driver memory usually solves the problem. For truly massive datasets, you’d switch back to Spark’s distributed writers.
Schema Mismatches
If you get “cannot resolve column name” errors, your DataFrame doesn’t have the columns you think it does:
# Debug by checking what columns actually exist
df.printSchema()
print(df.columns)
This usually means a transformation dropped or renamed a column, and you forgot to update the downstream code.
Slow Performance
If your pipeline is running but taking forever, don’t worry about optimization yet. That’s a whole separate topic. For now, just get it working. But if it’s really unbearably slow, try caching DataFrames you reference multiple times:
df.cache() # Keep frequently used data in memory
Just remember to call df.unpersist()
when you’re done with it to free up memory.
What You’ve Accomplished
You just built a complete ETL pipeline from scratch. Here’s what you learned:
- You can handle messy real-world data. CSV files with dollar signs in prices, mixed date formats, and test records mixed into production data.
- You can structure projects professionally. Separate functions for extract, transform, and load. Logging that helps you debug failures. Error handling that keeps the pipeline running when something goes wrong.
- You know how to run production-style jobs. Code you can deploy with
spark-submit
that runs on a schedule. - You can spot and flag data quality issues. Suspicious prices get flagged. Test data gets filtered. Summary reports tell you what processed.
This is the foundation every data engineer needs. You’re ready to build ETL pipelines for real projects.
What’s Next
This pipeline works, but it’s not optimized. Here’s what comes after you’re comfortable with the basics:
- Performance optimization – Make this pipeline 10x faster by reducing shuffles, tuning partitions, and computing metrics efficiently.
- Production deployment – Run this on Databricks or EMR. Handle configuration properly, monitor with metrics, and schedule with Airflow.
- Testing and validation – Write tests for your transformations. Add data quality checks. Build confidence that changes won’t break production.
But those are advanced topics. For now, you’ve built something real. Take a break, then find a messy CSV dataset and build an ETL pipeline for it. The best way to learn is by doing, so here’s a concrete exercise to cement what you’ve learned:
- Find any CSV dataset (Kaggle has thousands)
- Build an ETL pipeline for it
- Add handling for three data quality issues you discover
- Output clean parquet files partitioned by a date or category field
- Create a summary report showing what you processed
You now have the foundation every data engineer needs. The next time you see messy data at work, you’ll know exactly how to turn it into something useful.
Source link