If you've got a CSV of leads and you're trying to clean it up before a campaign, you usually hit the same wall:
- You need more than a regex check. You want real verification signals.
- You can't just fire 50 threads at an API and hope it works. Quotas and rate limits are real.
- You need outputs that your team can act on: keep, suppress, review, or delete.
This guide shows a practical bulk pipeline in Python using the Bulk Email Checker API. You'll read a CSV, verify each address, respect throttling, retry safely, and export a clean file with clear decision fields.
Before you start: validation vs verification
Validation checks format. Verification checks deliverability signals like MX records, mailbox behavior, disposable detection, role account detection, and more. For list hygiene, verification is the part that saves you money and protects sender reputation.
Know the API limits so your script doesn't get cut off
The real-time endpoint is straightforward, but it's not a firehose. Bulk Email Checker documents an hourly request quota per plan and a 10 requests per second rate limit for the API version shown in the docs. Build your pipeline around those limits, not around wishful thinking.
- Endpoint pattern:
https://api.bulkemailchecker.com/?key=YOUR_KEY&email=email@example.com - Optional XML response: add
&xml=true
If you're validating very large lists on a schedule, use the product plan intended for bulk throughput. For smaller batches, the approach below works well as long as you throttle and queue.
Pipeline design
You're going to build a simple, durable flow:
- Input: CSV with an email column
- Normalize: trim, lowercase, dedupe
- Verify: call the API with rate limiting
- Decide: map API flags to keep/suppress/review
- Export: a new CSV with status columns
What you should store from each result
The API response includes fields that are perfect for bulk decisioning. These are the ones that usually matter most in a list workflow:
successandfailedfor the outcomeroleAccountto flag addresses likesales@,info@,support@disposableto catch temporary inboxesfreeto flag consumer mail providers, if your policy treats them differentlyacceptAllfor catch-all domainsemailfor the normalized address you actually checked
A practical decision table
Here's a baseline rule set you can tune to match your business. The key is that you make decisions consistently.
| Condition | Action | Why |
|---|---|---|
failed == true or success == false |
Suppress | Avoid bounces and protect sender reputation |
disposable == true |
Suppress | Disposable inboxes rarely convert and can skew metrics |
acceptAll == true |
Review | Catch-all domains need extra signals or a follow-up strategy |
roleAccount == true |
Review or route | Role inboxes can be valid but often don't behave like a person |
| Everything else | Keep | Good candidate for outreach |
Python bulk verifier: CSV in, verified CSV out
This script is designed for real life:
- Dedupes and normalizes
- Uses a thread pool for throughput
- Throttles requests to stay under the rate limit
- Retries with exponential backoff on transient failures
- Writes a clean output CSV
import csv
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import quote_plus
import requests
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://api.bulkemailchecker.com/"
EMAIL_COLUMN = "email" # change if your column name differs
# Rate limiting: docs show 10 requests per second. Stay under it.
MAX_RPS = 8
MIN_INTERVAL = 1.0 / MAX_RPS
# Concurrency: keep this modest so you don't spike retries.
WORKERS = 8
# Retry policy
MAX_RETRIES = 4
BACKOFF_BASE_SECONDS = 0.6
_lock = threading.Lock()
_last_request_ts = 0.0
def throttle():
global _last_request_ts
with _lock:
now = time.time()
wait = MIN_INTERVAL - (now - _last_request_ts)
if wait > 0:
time.sleep(wait)
_last_request_ts = time.time()
def verify_email(email: str) -> dict:
# Normalize and escape
email_norm = email.strip().lower()
if not email_norm:
return {"email": email, "success": False, "failed": True, "error": "empty"}
# API pattern from docs:
# https://api.bulkemailchecker.com/?key=YOUR_KEY&email=email@example.com
url = f"{BASE_URL}?key={quote_plus(API_KEY)}&email={quote_plus(email_norm)}"
for attempt in range(1, MAX_RETRIES + 1):
try:
throttle()
r = requests.get(url, timeout=20)
if r.status_code in (429, 500, 502, 503, 504):
raise RuntimeError(f"transient_status:{r.status_code}")
r.raise_for_status()
data = r.json()
# Guarantee a few fields exist for downstream logic
data.setdefault("email", email_norm)
data.setdefault("success", False)
data.setdefault("failed", not bool(data.get("success")))
return data
except Exception as e:
if attempt == MAX_RETRIES:
return {
"email": email_norm,
"success": False,
"failed": True,
"error": str(e),
}
sleep_for = BACKOFF_BASE_SECONDS * (2 ** (attempt - 1))
time.sleep(sleep_for)
def decision(row: dict) -> str:
if row.get("failed") or not row.get("success"):
return "suppress"
if row.get("disposable"):
return "suppress"
if row.get("acceptAll"):
return "review"
if row.get("roleAccount"):
return "review"
return "keep"
def run(input_csv: str, output_csv: str):
# Read input rows
with open(input_csv, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
# Normalize and dedupe
seen = set()
emails = []
for r in rows:
e = (r.get(EMAIL_COLUMN) or "").strip().lower()
if not e or e in seen:
continue
seen.add(e)
emails.append(e)
total = len(emails)
print(f"Queued {total} unique emails")
results = []
started = time.time()
with ThreadPoolExecutor(max_workers=WORKERS) as pool:
futures = {pool.submit(verify_email, e): e for e in emails}
done = 0
for fut in as_completed(futures):
res = fut.result()
res["decision"] = decision(res)
results.append(res)
done += 1
if done % 50 == 0 or done == total:
elapsed = time.time() - started
rps = done / elapsed if elapsed > 0 else 0
print(f"{done}/{total} complete - {rps:.2f} verified/sec")
# Write output
fieldnames = [
"email",
"success",
"failed",
"disposable",
"roleAccount",
"free",
"acceptAll",
"decision",
"error",
]
with open(output_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in results:
writer.writerow({k: r.get(k, "") for k in fieldnames})
print(f"Done in {time.time() - started:.1f}s. Output: {output_csv}")
if __name__ == "__main__":
run("input.csv", "verified_output.csv")
Tips to make this pipeline more reliable
- Keep your RPS under the documented limit. Your results will be faster overall when you avoid retry storms.
- Log errors and keep going. For bulk jobs, you want a complete output file even if some rows fail.
- Separate suppress vs review. Catch-all and role accounts can be legitimate, but they should not be treated as clean leads by default.
- Store the raw response. If you save JSON results, you can re-run decisioning rules without re-verifying.
FAQ
Should I verify the same address more than once?
Not usually. Dedupe first, store results, and re-check only when you have a reason (like a long time gap or a high-value lead).
What about SMTP verification inside my own code?
In practice, direct SMTP probing is fragile. Many servers throttle, tarp it, or block probing behavior. Using an API that already accounts for these edge cases is simpler and more stable.
Can I export a file that my ESP can import?
Yes. Most ESPs can ingest a CSV with a single email column, plus optional tags. Use the decision column to drive your segmentation or suppression list.
Sources
- Bulk Email Checker API documentation
- Python csv module documentation
- Python concurrent.futures documentation
- Python urllib.parse.quote_plus documentation
- Requests quickstart documentation
- RFC 5321 (SMTP)
Next step
If you want to test a small sample first, use the Free Email Checker. When you're ready to integrate, the API docs show the exact response fields your pipeline can consume.
Stop Bouncing. Start Converting.
Millions of emails verified daily. Industry-leading SMTP validation engine.