Scenario A2: Async App → Sync Legacy Library

Target Audience: Web developers, async framework users Difficulty: Intermediate Keywords: FastAPI, Django, aiohttp, legacy, database, psycopg2, sqlite3


📋 The Problem

You’re building a modern async web application (FastAPI, aiohttp, etc.) but need to integrate with:

  • Legacy synchronous database drivers (psycopg2, MySQLdb, sqlite3)

  • Sync third-party libraries without async support

  • CPU-intensive blocking operations

  • Legacy business logic code

The Challenge: Calling sync blocking code from async context blocks the event loop, freezing your entire application.


🔴 Without SmartAsync

Naive Approach (WRONG!)

from fastapi import FastAPI
import sqlite3

app = FastAPI()

@app.get("/users")
async def get_users():
    # ⚠️ BLOCKS EVENT LOOP!
    conn = sqlite3.connect("app.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    users = cursor.fetchall()
    conn.close()
    return {"users": users}

Problem: sqlite3.connect() and cursor.execute() are blocking I/O operations that freeze the event loop. No other requests can be processed!

Manual Threading (VERBOSE!)

from fastapi import FastAPI
import asyncio
import sqlite3

app = FastAPI()

def _query_db():
    """Blocking database query."""
    conn = sqlite3.connect("app.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    users = cursor.fetchall()
    conn.close()
    return [{"id": u[0], "name": u[1]} for u in users]

@app.get("/users")
async def get_users():
    # Manual threading - verbose and repetitive
    users = await asyncio.to_thread(_query_db)
    return {"users": users}

# Must create wrapper function for EVERY database call!
def _insert_user(name: str, email: str):
    conn = sqlite3.connect("app.db")
    cursor = conn.cursor()
    cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
    conn.commit()
    conn.close()
    return cursor.rowcount

@app.post("/users")
async def create_user(name: str, email: str):
    rowcount = await asyncio.to_thread(_insert_user, name, email)
    return {"created": rowcount}

Problems:

  • ❌ Boilerplate for every database method

  • ❌ Wrapper functions clutter codebase

  • ❌ Hard to maintain

  • ❌ Error handling duplicated


🟢 With SmartAsync

Clean Solution

from fastapi import FastAPI
from smartasync import smartasync
import sqlite3
from typing import List, Dict, Any

class DatabaseManager:
    """Sync database with async-friendly interface."""

    def __init__(self, db_path: str):
        self.db_path = db_path

    @smartasync
    def query(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]:
        """Execute query - auto-threaded in async context."""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()

        try:
            cursor.execute(sql, params)
            rows = cursor.fetchall()
            return [dict(row) for row in rows]
        finally:
            conn.close()

    @smartasync
    def execute(self, sql: str, params: tuple = ()) -> int:
        """Execute write operation - auto-threaded in async context."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        try:
            cursor.execute(sql, params)
            conn.commit()
            return cursor.rowcount
        finally:
            conn.close()

# FastAPI application
app = FastAPI()
db = DatabaseManager("app.db")

@app.get("/users")
async def get_users():
    """Async endpoint - database call is auto-threaded."""
    users = await db.query("SELECT * FROM users")
    return {"users": users}

@app.post("/users")
async def create_user(name: str, email: str):
    """Async endpoint with database write."""
    rowcount = await db.execute(
        "INSERT INTO users (name, email) VALUES (?, ?)",
        (name, email)
    )
    return {"created": rowcount}

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    """Parameterized query."""
    users = await db.query(
        "SELECT * FROM users WHERE id = ?",
        (user_id,)
    )
    if not users:
        return {"error": "User not found"}, 404
    return {"user": users[0]}

Benefits:

  • ✅ No boilerplate wrapper functions

  • ✅ Clean class-based API

  • ✅ Automatic threading in async context

  • ✅ Event loop never blocked

  • ✅ Can still use in sync CLI tools (no await needed)


💡 Complete Example: FastAPI + Legacy Database

Full Application

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from smartasync import smartasync
import sqlite3
from typing import List, Dict, Any, Optional
from contextlib import contextmanager

class User(BaseModel):
    """User model."""
    name: str
    email: str

class UserResponse(BaseModel):
    """User response model."""
    id: int
    name: str
    email: str

class DatabaseManager:
    """Production-ready database manager."""

    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """Initialize database schema."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
        conn.close()

    @contextmanager
    def _get_connection(self):
        """Context manager for database connections."""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()

    @smartasync
    def get_all_users(self) -> List[Dict[str, Any]]:
        """Get all users."""
        with self._get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("SELECT id, name, email FROM users ORDER BY id")
            return [dict(row) for row in cursor.fetchall()]

    @smartasync
    def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
        """Get user by ID."""
        with self._get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "SELECT id, name, email FROM users WHERE id = ?",
                (user_id,)
            )
            row = cursor.fetchone()
            return dict(row) if row else None

    @smartasync
    def create_user(self, name: str, email: str) -> int:
        """Create new user, return ID."""
        with self._get_connection() as conn:
            cursor = conn.cursor()
            try:
                cursor.execute(
                    "INSERT INTO users (name, email) VALUES (?, ?)",
                    (name, email)
                )
                conn.commit()
                return cursor.lastrowid
            except sqlite3.IntegrityError:
                raise ValueError(f"Email {email} already exists")

    @smartasync
    def update_user(self, user_id: int, name: str, email: str) -> bool:
        """Update user, return success."""
        with self._get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                "UPDATE users SET name = ?, email = ? WHERE id = ?",
                (name, email, user_id)
            )
            conn.commit()
            return cursor.rowcount > 0

    @smartasync
    def delete_user(self, user_id: int) -> bool:
        """Delete user, return success."""
        with self._get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
            conn.commit()
            return cursor.rowcount > 0

# FastAPI application
app = FastAPI(title="User API with Legacy DB")
db = DatabaseManager("users.db")

@app.get("/users", response_model=List[UserResponse])
async def list_users():
    """List all users."""
    users = await db.get_all_users()
    return users

@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: int):
    """Get specific user."""
    user = await db.get_user_by_id(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(user: User):
    """Create new user."""
    try:
        user_id = await db.create_user(user.name, user.email)
        created_user = await db.get_user_by_id(user_id)
        return created_user
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.put("/users/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user: User):
    """Update user."""
    success = await db.update_user(user_id, user.name, user.email)
    if not success:
        raise HTTPException(status_code=404, detail="User not found")
    updated_user = await db.get_user_by_id(user_id)
    return updated_user

@app.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int):
    """Delete user."""
    success = await db.delete_user(user_id)
    if not success:
        raise HTTPException(status_code=404, detail="User not found")
    return None

# Can also use in CLI tools!
if __name__ == "__main__":
    # Sync usage for admin scripts
    print("Creating test users...")
    db.create_user("Alice", "alice@example.com")
    db.create_user("Bob", "bob@example.com")

    print("\nAll users:")
    for user in db.get_all_users():
        print(f"  - {user['name']} <{user['email']}>")

Run the API

# Install dependencies
pip install fastapi uvicorn smartasync

# Run server
uvicorn main:app --reload

# Test endpoints
curl http://localhost:8000/users
curl -X POST http://localhost:8000/users \
     -H "Content-Type: application/json" \
     -d '{"name": "Charlie", "email": "charlie@example.com"}'

⚠️ Important Considerations

1. Connection Pooling

For production, consider connection pooling:

from queue import Queue
import sqlite3

class PooledDatabaseManager:
    """Database manager with connection pool."""

    def __init__(self, db_path: str, pool_size: int = 5):
        self.db_path = db_path
        self.pool = Queue(maxsize=pool_size)

        # Initialize pool
        for _ in range(pool_size):
            conn = sqlite3.connect(db_path, check_same_thread=False)
            self.pool.put(conn)

    @contextmanager
    def _get_connection(self):
        """Get connection from pool."""
        conn = self.pool.get()
        try:
            yield conn
        finally:
            self.pool.put(conn)

    @smartasync
    def query(self, sql: str, params: tuple = ()) -> List[Dict]:
        """Query using pooled connection."""
        with self._get_connection() as conn:
            conn.row_factory = sqlite3.Row
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return [dict(row) for row in cursor.fetchall()]

2. Transaction Management

For complex transactions:

@smartasync
def transfer_funds(self, from_id: int, to_id: int, amount: float):
    """Atomic transaction."""
    with self._get_connection() as conn:
        try:
            cursor = conn.cursor()

            # Debit
            cursor.execute(
                "UPDATE accounts SET balance = balance - ? WHERE id = ?",
                (amount, from_id)
            )

            # Credit
            cursor.execute(
                "UPDATE accounts SET balance = balance + ? WHERE id = ?",
                (amount, to_id)
            )

            conn.commit()
            return True
        except Exception as e:
            conn.rollback()
            raise

3. Thread Safety

SQLite check_same_thread=False may be needed:

conn = sqlite3.connect(db_path, check_same_thread=False)

But be careful: SQLite itself is thread-safe, but Python’s wrapper has restrictions.

4. Performance

For high-load scenarios:

  • Consider async-native drivers when available (asyncpg for PostgreSQL)

  • Use connection pooling

  • Monitor thread pool size (asyncio.to_thread uses default pool)



📚 Database Driver Compatibility

Driver

Type

SmartAsync Support

Notes

sqlite3

Sync

✅ Yes

Built-in, works great

psycopg2

Sync

✅ Yes

PostgreSQL sync driver

psycopg (v3)

Both

⚠️ Use native async

Has async support

MySQLdb

Sync

✅ Yes

MySQL sync driver

PyMySQL

Sync

✅ Yes

Pure Python MySQL

asyncpg

Async

⚠️ Already async

No need for SmartAsync

motor

Async

⚠️ Already async

MongoDB async driver

Rule of Thumb: If the driver is sync-only, SmartAsync helps. If it has native async support, use that directly.


Next Steps: