If you've got a CSV of leads and you're trying to clean it up before a campaign, you usually hit the same wall:

  • You need more than a regex check. You want real verification signals.
  • You can't just fire 50 threads at an API and hope it works. Quotas and rate limits are real.
  • You need outputs that your team can act on: keep, suppress, review, or delete.

This guide shows a practical bulk pipeline in Python using the Bulk Email Checker API. You'll read a CSV, verify each address, respect throttling, retry safely, and export a clean file with clear decision fields.

Before you start: validation vs verification

Validation checks format. Verification checks deliverability signals like MX records, mailbox behavior, disposable detection, role account detection, and more. For list hygiene, verification is the part that saves you money and protects sender reputation.

Know the API limits so your script doesn't get cut off

The real-time endpoint is straightforward, but it's not a firehose. Bulk Email Checker documents an hourly request quota per plan and a 10 requests per second rate limit for the API version shown in the docs. Build your pipeline around those limits, not around wishful thinking.

  • Endpoint pattern: https://api.bulkemailchecker.com/?key=YOUR_KEY&email=email@example.com
  • Optional XML response: add &xml=true

If you're validating very large lists on a schedule, use the product plan intended for bulk throughput. For smaller batches, the approach below works well as long as you throttle and queue.

Pipeline design

You're going to build a simple, durable flow:

  1. Input: CSV with an email column
  2. Normalize: trim, lowercase, dedupe
  3. Verify: call the API with rate limiting
  4. Decide: map API flags to keep/suppress/review
  5. Export: a new CSV with status columns

What you should store from each result

The API response includes fields that are perfect for bulk decisioning. These are the ones that usually matter most in a list workflow:

  • success and failed for the outcome
  • roleAccount to flag addresses like sales@, info@, support@
  • disposable to catch temporary inboxes
  • free to flag consumer mail providers, if your policy treats them differently
  • acceptAll for catch-all domains
  • email for the normalized address you actually checked

A practical decision table

Here's a baseline rule set you can tune to match your business. The key is that you make decisions consistently.

Condition Action Why
failed == true or success == false Suppress Avoid bounces and protect sender reputation
disposable == true Suppress Disposable inboxes rarely convert and can skew metrics
acceptAll == true Review Catch-all domains need extra signals or a follow-up strategy
roleAccount == true Review or route Role inboxes can be valid but often don't behave like a person
Everything else Keep Good candidate for outreach

Python bulk verifier: CSV in, verified CSV out

This script is designed for real life:

  • Dedupes and normalizes
  • Uses a thread pool for throughput
  • Throttles requests to stay under the rate limit
  • Retries with exponential backoff on transient failures
  • Writes a clean output CSV
import csv
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import quote_plus

import requests

API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.bulkemailchecker.com/"
EMAIL_COLUMN = "email"  # change if your column name differs

# Rate limiting: docs show 10 requests per second. Stay under it.
MAX_RPS = 8
MIN_INTERVAL = 1.0 / MAX_RPS

# Concurrency: keep this modest so you don't spike retries.
WORKERS = 8

# Retry policy
MAX_RETRIES = 4
BACKOFF_BASE_SECONDS = 0.6

_lock = threading.Lock()
_last_request_ts = 0.0

def throttle():
    global _last_request_ts
    with _lock:
        now = time.time()
        wait = MIN_INTERVAL - (now - _last_request_ts)
        if wait > 0:
            time.sleep(wait)
        _last_request_ts = time.time()

def verify_email(email: str) -> dict:
    # Normalize and escape
    email_norm = email.strip().lower()
    if not email_norm:
        return {"email": email, "success": False, "failed": True, "error": "empty"}

    # API pattern from docs:
    # https://api.bulkemailchecker.com/?key=YOUR_KEY&email=email@example.com
    url = f"{BASE_URL}?key={quote_plus(API_KEY)}&email={quote_plus(email_norm)}"

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            throttle()
            r = requests.get(url, timeout=20)
            if r.status_code in (429, 500, 502, 503, 504):
                raise RuntimeError(f"transient_status:{r.status_code}")

            r.raise_for_status()
            data = r.json()

            # Guarantee a few fields exist for downstream logic
            data.setdefault("email", email_norm)
            data.setdefault("success", False)
            data.setdefault("failed", not bool(data.get("success")))
            return data

        except Exception as e:
            if attempt == MAX_RETRIES:
                return {
                    "email": email_norm,
                    "success": False,
                    "failed": True,
                    "error": str(e),
                }
            sleep_for = BACKOFF_BASE_SECONDS * (2 ** (attempt - 1))
            time.sleep(sleep_for)

def decision(row: dict) -> str:
    if row.get("failed") or not row.get("success"):
        return "suppress"
    if row.get("disposable"):
        return "suppress"
    if row.get("acceptAll"):
        return "review"
    if row.get("roleAccount"):
        return "review"
    return "keep"

def run(input_csv: str, output_csv: str):
    # Read input rows
    with open(input_csv, newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        rows = list(reader)

    # Normalize and dedupe
    seen = set()
    emails = []
    for r in rows:
        e = (r.get(EMAIL_COLUMN) or "").strip().lower()
        if not e or e in seen:
            continue
        seen.add(e)
        emails.append(e)

    total = len(emails)
    print(f"Queued {total} unique emails")

    results = []
    started = time.time()

    with ThreadPoolExecutor(max_workers=WORKERS) as pool:
        futures = {pool.submit(verify_email, e): e for e in emails}
        done = 0
        for fut in as_completed(futures):
            res = fut.result()
            res["decision"] = decision(res)
            results.append(res)

            done += 1
            if done % 50 == 0 or done == total:
                elapsed = time.time() - started
                rps = done / elapsed if elapsed > 0 else 0
                print(f"{done}/{total} complete - {rps:.2f} verified/sec")

    # Write output
    fieldnames = [
        "email",
        "success",
        "failed",
        "disposable",
        "roleAccount",
        "free",
        "acceptAll",
        "decision",
        "error",
    ]
    with open(output_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for r in results:
            writer.writerow({k: r.get(k, "") for k in fieldnames})

    print(f"Done in {time.time() - started:.1f}s. Output: {output_csv}")

if __name__ == "__main__":
    run("input.csv", "verified_output.csv")

Tips to make this pipeline more reliable

  • Keep your RPS under the documented limit. Your results will be faster overall when you avoid retry storms.
  • Log errors and keep going. For bulk jobs, you want a complete output file even if some rows fail.
  • Separate suppress vs review. Catch-all and role accounts can be legitimate, but they should not be treated as clean leads by default.
  • Store the raw response. If you save JSON results, you can re-run decisioning rules without re-verifying.

FAQ

Should I verify the same address more than once?

Not usually. Dedupe first, store results, and re-check only when you have a reason (like a long time gap or a high-value lead).

What about SMTP verification inside my own code?

In practice, direct SMTP probing is fragile. Many servers throttle, tarp it, or block probing behavior. Using an API that already accounts for these edge cases is simpler and more stable.

Can I export a file that my ESP can import?

Yes. Most ESPs can ingest a CSV with a single email column, plus optional tags. Use the decision column to drive your segmentation or suppression list.

Sources

Next step

If you want to test a small sample first, use the Free Email Checker. When you're ready to integrate, the API docs show the exact response fields your pipeline can consume.

99.7% Accuracy Guarantee

Stop Bouncing. Start Converting.

Millions of emails verified daily. Industry-leading SMTP validation engine.