Home Services Portfolio Blog Trainings Contact Hire Me

If you've ever found yourself manually downloading CSV files, copying data between spreadsheets, reformatting columns, and then pasting results into a report — you already have a data pipeline. It's just a painfully manual one.

A data pipeline is any process that moves data from point A to point B, transforming it along the way. The moment you automate that process — pulling data on a schedule, cleaning it without human intervention, and loading it where it needs to go — you free up hours of your week and eliminate an entire category of human error.

Python is the best tool for the job. It's free, readable, and has an enormous ecosystem of libraries built specifically for data work. In this guide, I'll walk you through building a complete automated data pipeline from scratch — even if you've never written one before.

What You'll Need

Before we dive in, make sure you have Python 3.8+ installed. Then install the libraries we'll use throughout this tutorial:

pip install pandas requests schedule openpyxl

Here's what each one does:

  • pandas — the Swiss army knife for data manipulation. Reading files, filtering rows, merging datasets, and writing output.
  • requests — makes HTTP requests to pull data from APIs.
  • schedule — a lightweight Python library for scheduling tasks to run at regular intervals.
  • openpyxl — lets pandas read and write Excel files (.xlsx).

We'll also use sqlite3, logging, and json — all of which come built into Python, so no extra installation needed.

1 Extract — Pulling Data From Sources

Every pipeline starts with extraction: getting data from wherever it lives. The two most common sources I deal with in client projects are APIs and flat files (CSV, Excel, JSON). Let's handle both.

Pulling Data From an API

Most modern services expose a REST API you can query for data. Here's how to pull JSON data from an API and convert it into a pandas DataFrame:

import requests
import pandas as pd

def extract_from_api(url, headers=None, params=None):
    """Pull data from a REST API and return a DataFrame."""
    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()  # Raise an error for bad status codes

    data = response.json()

    # Many APIs wrap results in a key like "results" or "data"
    if isinstance(data, dict) and "results" in data:
        data = data["results"]

    return pd.DataFrame(data)

# Example: fetch user data from a public API
df_users = extract_from_api("https://jsonplaceholder.typicode.com/users")
print(df_users.head())

The raise_for_status() call is important — it throws an exception if the API returns a 4xx or 5xx error, so your pipeline fails loudly instead of silently processing garbage data.

Reading From CSV and Excel Files

For local files, pandas makes extraction trivial:

import pandas as pd

# CSV file
df_csv = pd.read_csv("data/sales_report.csv")

# Excel file (requires openpyxl)
df_excel = pd.read_excel("data/inventory.xlsx", sheet_name="Sheet1")

# JSON file
df_json = pd.read_json("data/products.json")

# You can even read directly from a URL
df_remote = pd.read_csv("https://example.com/data/export.csv")

Pro tip: When reading CSV files from clients, always specify the encoding parameter if you're seeing garbled characters. Try encoding="utf-8" first, then encoding="latin-1" as a fallback.

2 Transform — Cleaning and Shaping the Data

Raw data is almost never ready to use. Column names are inconsistent, there are missing values, date formats are all over the place, and you need to join data from multiple sources. This is where pandas really shines.

Common Cleaning Operations

import pandas as pd

def transform_data(df):
    """Clean and transform raw data."""

    # Standardize column names: lowercase, replace spaces with underscores
    df.columns = df.columns.str.lower().str.replace(" ", "_")

    # Remove duplicate rows
    df = df.drop_duplicates()

    # Drop rows where critical columns are missing
    df = df.dropna(subset=["email", "order_total"])

    # Fill non-critical missing values with defaults
    df["notes"] = df["notes"].fillna("N/A")

    # Convert data types
    df["order_date"] = pd.to_datetime(df["order_date"], errors="coerce")
    df["order_total"] = pd.to_numeric(df["order_total"], errors="coerce")

    # Strip whitespace from string columns
    string_cols = df.select_dtypes(include="object").columns
    df[string_cols] = df[string_cols].apply(lambda col: col.str.strip())

    # Filter: only keep orders from the last 90 days
    cutoff = pd.Timestamp.now() - pd.Timedelta(days=90)
    df = df[df["order_date"] >= cutoff]

    return df

Merging Data From Multiple Sources

Often, useful insights come from combining datasets. For example, enriching order data with customer information:

# Merge orders with customer details
df_orders = pd.read_csv("data/orders.csv")
df_customers = pd.read_csv("data/customers.csv")

df_enriched = pd.merge(
    df_orders,
    df_customers,
    on="customer_id",      # The shared key
    how="left"             # Keep all orders, even if no customer match
)

# Add calculated columns
df_enriched["revenue_per_unit"] = (
    df_enriched["order_total"] / df_enriched["quantity"]
)

# Group and aggregate
summary = df_enriched.groupby("region").agg(
    total_revenue=("order_total", "sum"),
    avg_order=("order_total", "mean"),
    order_count=("order_id", "count")
).reset_index()

print(summary)

This is the kind of transformation that takes 30 minutes of manual spreadsheet work and replaces it with three seconds of code execution.

3 Load — Writing Data to Its Destination

Once your data is clean and shaped, you need to send it somewhere useful. Here are the most common destinations I build into client pipelines.

Writing to CSV and Excel

# Write to CSV
df.to_csv("output/cleaned_data.csv", index=False)

# Write to Excel with formatting
with pd.ExcelWriter("output/report.xlsx", engine="openpyxl") as writer:
    summary.to_excel(writer, sheet_name="Summary", index=False)
    df_enriched.to_excel(writer, sheet_name="Detail", index=False)

Loading Into a SQLite Database

For anything beyond a simple export, I recommend loading data into a database. SQLite is built into Python and requires zero setup — perfect for local pipelines and prototyping:

import sqlite3
import pandas as pd

def load_to_database(df, table_name, db_path="pipeline.db"):
    """Load a DataFrame into a SQLite database."""
    conn = sqlite3.connect(db_path)

    # "replace" drops and recreates the table each run
    # Use "append" if you want to add rows to an existing table
    df.to_sql(table_name, conn, if_exists="replace", index=False)

    # Verify the load
    row_count = pd.read_sql(
        f"SELECT COUNT(*) as cnt FROM {table_name}", conn
    ).iloc[0]["cnt"]

    print(f"Loaded {row_count} rows into '{table_name}'")

    conn.close()

# Usage
load_to_database(df_enriched, "orders")
load_to_database(summary, "order_summary")

Once the data is in SQLite, you can query it with standard SQL — which makes it easy to build dashboards, run ad-hoc analysis, or serve it to other tools.

Bonus: Writing to Google Sheets

If your stakeholders live in Google Sheets, you can push data there programmatically using the gspread library:

pip install gspread google-auth
import gspread
from google.oauth2.service_account import Credentials
import pandas as pd

def load_to_google_sheets(df, spreadsheet_name, worksheet_name):
    """Push a DataFrame to a Google Sheet."""
    scopes = ["https://www.googleapis.com/auth/spreadsheets"]
    creds = Credentials.from_service_account_file(
        "credentials.json", scopes=scopes
    )
    client = gspread.authorize(creds)

    sheet = client.open(spreadsheet_name).worksheet(worksheet_name)

    # Clear existing data and write fresh
    sheet.clear()
    sheet.update(
        [df.columns.values.tolist()] + df.values.tolist()
    )

    print(f"Updated '{worksheet_name}' in '{spreadsheet_name}'")

Pro tip: When loading to Google Sheets, always clear and rewrite rather than appending. It prevents data duplication and keeps the sheet in a predictable state.

4 Schedule — Running Your Pipeline on Autopilot

A pipeline that you have to remember to run manually isn't really automated. Let's fix that.

Option A: The schedule Library (Python-Native)

The schedule library is the simplest way to run Python functions on a timer. It's great for lightweight pipelines that run on a server or always-on machine:

import schedule
import time

def run_pipeline():
    """Execute the full ETL pipeline."""
    print(f"Pipeline started at {time.strftime('%Y-%m-%d %H:%M:%S')}")

    # Extract
    df_raw = extract_from_api("https://api.example.com/sales")

    # Transform
    df_clean = transform_data(df_raw)

    # Load
    load_to_database(df_clean, "daily_sales")
    df_clean.to_csv("output/daily_sales.csv", index=False)

    print("Pipeline completed successfully.")

# Schedule the pipeline
schedule.every().day.at("06:00").do(run_pipeline)
schedule.every().monday.at("09:00").do(run_pipeline)  # Extra Monday run

# Keep the script running
print("Scheduler is running. Press Ctrl+C to stop.")
while True:
    schedule.run_pending()
    time.sleep(60)  # Check every 60 seconds

The schedule library is readable and intuitive. You can schedule by minutes, hours, days, or specific weekdays — all in plain English.

Option B: Cron (Linux/Mac) or Task Scheduler (Windows)

For production pipelines, I usually prefer using the operating system's built-in scheduler. It's more reliable because it doesn't depend on a Python process staying alive.

On Linux or Mac, use cron. Open your crontab with crontab -e and add:

# Run the pipeline every day at 6:00 AM
0 6 * * * /usr/bin/python3 /home/user/pipeline/main.py >> /home/user/pipeline/logs/cron.log 2>&1

# Run every Monday at 9:00 AM
0 9 * * 1 /usr/bin/python3 /home/user/pipeline/main.py >> /home/user/pipeline/logs/cron.log 2>&1

On Windows, use Task Scheduler: create a new task, set the trigger to your desired schedule, and set the action to run python.exe with your script path as the argument.

5 Error Handling and Logging — Building a Pipeline You Can Trust

A pipeline that runs silently is a pipeline that fails silently. You need two things: robust error handling so one bad API call doesn't crash the whole run, and logging so you can see exactly what happened after the fact.

Setting Up Logging

import logging

# Configure logging — writes to both console and file
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("pipeline.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

Wrapping Steps in Try/Except

Never let a single failure take down the entire pipeline. Wrap each major step so you know exactly where things went wrong:

def run_pipeline():
    logger.info("Pipeline started.")

    # --- Extract ---
    try:
        df_raw = extract_from_api("https://api.example.com/sales")
        logger.info(f"Extracted {len(df_raw)} rows from API.")
    except requests.exceptions.RequestException as e:
        logger.error(f"Extraction failed: {e}")
        return  # Stop the pipeline — no data to process

    # --- Transform ---
    try:
        df_clean = transform_data(df_raw)
        logger.info(f"Transformed data: {len(df_clean)} rows after cleaning.")
    except Exception as e:
        logger.error(f"Transformation failed: {e}")
        return

    # --- Load ---
    try:
        load_to_database(df_clean, "daily_sales")
        df_clean.to_csv("output/daily_sales.csv", index=False)
        logger.info("Data loaded successfully.")
    except Exception as e:
        logger.error(f"Load failed: {e}")
        return

    logger.info("Pipeline completed successfully.")

With this structure, when something goes wrong at 3 AM, you open pipeline.log and immediately see which step failed and why. No guesswork.

Pro tip: For critical pipelines, add email or Slack notifications on failure. A simple SMTP call or webhook in the except block can save you from discovering a broken pipeline three days too late.

Complete End-to-End Example

Let's put everything together into a single, runnable pipeline script. This example pulls user data from a public API, cleans it, and loads it into both a CSV file and a SQLite database — with full logging and error handling.

"""
data_pipeline.py
A complete automated data pipeline with Extract, Transform, Load, and Scheduling.
"""

import requests
import pandas as pd
import sqlite3
import logging
import schedule
import time
from datetime import datetime

# --- Logging Setup ---
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("pipeline.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)


# --- Extract ---
def extract(url):
    """Pull data from an API and return a DataFrame."""
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    data = response.json()
    logger.info(f"Extracted {len(data)} records from {url}")
    return pd.DataFrame(data)


# --- Transform ---
def transform(df):
    """Clean and enrich the raw data."""
    # Standardize column names
    df.columns = df.columns.str.lower().str.replace(" ", "_")

    # Drop duplicates
    df = df.drop_duplicates(subset=["id"])

    # Keep only the columns we need
    cols_to_keep = ["id", "name", "username", "email", "phone"]
    df = df[[c for c in cols_to_keep if c in df.columns]]

    # Clean email: lowercase and strip whitespace
    if "email" in df.columns:
        df["email"] = df["email"].str.lower().str.strip()

    # Add metadata
    df["loaded_at"] = datetime.now().isoformat()

    logger.info(f"Transformed data: {len(df)} rows, {len(df.columns)} columns")
    return df


# --- Load ---
def load_csv(df, filepath):
    """Write DataFrame to a CSV file."""
    df.to_csv(filepath, index=False)
    logger.info(f"Saved {len(df)} rows to {filepath}")


def load_sqlite(df, table_name, db_path="pipeline.db"):
    """Write DataFrame to a SQLite database."""
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists="replace", index=False)
    count = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
    conn.close()
    logger.info(f"Loaded {count} rows into '{table_name}' ({db_path})")


# --- Pipeline ---
def run_pipeline():
    """Execute the full ETL pipeline."""
    logger.info("=" * 50)
    logger.info("Pipeline run started.")

    # Extract
    try:
        df_raw = extract("https://jsonplaceholder.typicode.com/users")
    except requests.exceptions.RequestException as e:
        logger.error(f"Extraction failed: {e}")
        return

    # Transform
    try:
        df_clean = transform(df_raw)
    except Exception as e:
        logger.error(f"Transformation failed: {e}")
        return

    # Load
    try:
        load_csv(df_clean, "output/users_clean.csv")
        load_sqlite(df_clean, "users")
    except Exception as e:
        logger.error(f"Load failed: {e}")
        return

    logger.info("Pipeline run completed successfully.")
    logger.info("=" * 50)


# --- Entry Point ---
if __name__ == "__main__":
    # Run once immediately
    run_pipeline()

    # Then schedule for daily runs
    schedule.every().day.at("06:00").do(run_pipeline)

    logger.info("Scheduler active. Press Ctrl+C to stop.")
    while True:
        schedule.run_pending()
        time.sleep(60)

Save this as data_pipeline.py, run it with python data_pipeline.py, and you have a working automated pipeline. It will execute once immediately, then again every day at 6 AM.

Tips for Taking This to Production

The script above is a solid starting point. Here's how to harden it for real-world use:

  • Use virtual environments. Always run your pipeline in a venv to isolate dependencies. Create one with python -m venv venv and activate it before installing packages. This prevents version conflicts when you have multiple projects on the same machine.
  • Store configuration separately. Never hardcode API URLs, database paths, or credentials in your script. Use a config.json or .env file and load values with json.load() or the python-dotenv library. This makes it easy to switch between dev and production environments without modifying code.
  • Add data validation checkpoints. After each step, verify the data looks right. Check that row counts are within expected ranges, that required columns exist, and that key values are non-null. Fail fast if something looks off — it's better to stop the pipeline than to load bad data.
  • Implement retry logic. API calls fail. Networks time out. Use a library like tenacity or write a simple retry decorator to automatically retry failed requests 2-3 times with a short delay between attempts.
  • Monitor and alert. At minimum, check your pipeline.log daily. Better yet, send yourself a summary email or Slack message after each run. For critical pipelines, set up alerts that fire when a run fails or when data volumes look unusual.
  • Version control your pipeline. Treat your pipeline code like any other software project. Keep it in Git, write meaningful commit messages, and tag releases. When something breaks in production, you want to be able to compare against the last known working version.

The difference between a script on your laptop and a production pipeline is reliability. Logging, error handling, config management, and monitoring are what make the difference.

Need Help Building Your Pipeline?

If you've got data scattered across APIs, spreadsheets, and databases — and you want it flowing automatically into clean, usable reports — I can help. I've built Python data pipelines for businesses on Upwork that pull from CRMs, e-commerce platforms, ad networks, and internal tools, delivering clean data to dashboards and stakeholders on autopilot.

Whether you need a simple CSV-to-database ETL or a multi-source pipeline with scheduling and monitoring, I'll build it, document it, and hand it off so your team can maintain it.

Get in touch for a free consultation, or hire me on Upwork to get started right away.

All Articles Hire Me

Want Data Tips Delivered to Your Inbox?

Subscribe to the ChromiumData newsletter for weekly insights, tutorials, and data tips straight to your inbox.

Subscribe Now