行莫
行莫
发布于 2025-11-19 / 2 阅读
0
0

PyMySQL 数据库操作详解

PyMySQL 数据库操作详解

目录

  1. PyMySQL 简介
  2. 安装与环境准备
  3. 基本连接操作
  4. 数据库基本操作
  5. 事务处理
  6. 连接池管理
  7. 错误处理与异常
  8. 最佳实践
  9. 完整示例项目

PyMySQL 简介

PyMySQL 是一个纯 Python 实现的 MySQL 客户端库,完全兼容 MySQLdb(MySQLdb 是 Python 2 时代的库)。PyMySQL 的主要特点:

  • 纯 Python 实现:无需编译,跨平台支持
  • 兼容性好:兼容 MySQLdb 的 API
  • 支持 Python 3:完全支持 Python 3.x
  • 轻量级:代码简洁,易于使用
  • 活跃维护:持续更新,社区活跃

安装与环境准备

安装 PyMySQL

使用 pip 安装:

pip install pymysql

或者使用 conda:

conda install -c conda-forge pymysql

准备测试数据库

在开始之前,确保你已经有一个可用的 MySQL 数据库。如果没有,可以:

  1. 安装 MySQL 服务器
  2. 创建测试数据库和用户

以下是创建测试环境的 SQL 脚本:

-- 创建数据库
CREATE DATABASE IF NOT EXISTS testdb CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

-- 创建用户(可选)
CREATE USER IF NOT EXISTS 'testuser'@'localhost' IDENTIFIED BY 'testpass123';

-- 授权
GRANT ALL PRIVILEGES ON testdb.* TO 'testuser'@'localhost';
FLUSH PRIVILEGES;

-- 使用数据库
USE testdb;

基本连接操作

1. 建立数据库连接

最基本的连接方式:

import pymysql

# 建立连接
connection = pymysql.connect(
    host='localhost',      # 数据库主机地址
    user='testuser',       # 数据库用户名
    password='testpass123', # 数据库密码
    database='testdb',     # 数据库名称
    port=3306,             # 端口号,默认 3306
    charset='utf8mb4'      # 字符集
)

print("数据库连接成功!")

# 关闭连接
connection.close()

2. 使用字典游标

默认情况下,查询结果返回元组。使用字典游标可以返回字典格式的结果:

import pymysql

connection = pymysql.connect(
    host='localhost',
    user='testuser',
    password='testpass123',
    database='testdb',
    charset='utf8mb4'
)

# 创建字典游标
cursor = connection.cursor(pymysql.cursors.DictCursor)

# 执行查询
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()

print(f"MySQL 版本: {result['VERSION()']}")

cursor.close()
connection.close()

3. 使用上下文管理器(推荐)

使用 with 语句可以自动管理连接的关闭:

import pymysql

# 使用上下文管理器自动关闭连接
with pymysql.connect(
    host='localhost',
    user='testuser',
    password='testpass123',
    database='testdb',
    charset='utf8mb4'
) as connection:
    with connection.cursor(pymysql.cursors.DictCursor) as cursor:
        cursor.execute("SELECT VERSION()")
        result = cursor.fetchone()
        print(f"MySQL 版本: {result['VERSION()']}")

数据库基本操作

1. 创建表

import pymysql

def create_table():
    """创建用户表"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 创建用户表
            sql = """
            CREATE TABLE IF NOT EXISTS users (
                id INT AUTO_INCREMENT PRIMARY KEY,
                username VARCHAR(50) NOT NULL UNIQUE,
                email VARCHAR(100) NOT NULL,
                age INT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                INDEX idx_username (username),
                INDEX idx_email (email)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """
            cursor.execute(sql)
            connection.commit()
            print("表创建成功!")
    finally:
        connection.close()

if __name__ == '__main__':
    create_table()

2. 插入数据

单条插入

import pymysql
from datetime import datetime

def insert_user(username, email, age):
    """插入单个用户"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
            cursor.execute(sql, (username, email, age))
            connection.commit()
            print(f"用户 {username} 插入成功,ID: {cursor.lastrowid}")
            return cursor.lastrowid
    except pymysql.IntegrityError as e:
        print(f"插入失败:用户名或邮箱已存在 - {e}")
        connection.rollback()
    except Exception as e:
        print(f"插入失败:{e}")
        connection.rollback()
    finally:
        connection.close()

if __name__ == '__main__':
    insert_user('zhangsan', 'zhangsan@example.com', 25)
    insert_user('lisi', 'lisi@example.com', 30)
    insert_user('wangwu', 'wangwu@example.com', 28)

批量插入

import pymysql

def insert_users_batch(users_data):
    """批量插入用户"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
            # executemany 用于批量插入
            affected_rows = cursor.executemany(sql, users_data)
            connection.commit()
            print(f"批量插入成功,共插入 {affected_rows} 条记录")
    except Exception as e:
        print(f"批量插入失败:{e}")
        connection.rollback()
    finally:
        connection.close()

if __name__ == '__main__':
    users = [
        ('user1', 'user1@example.com', 20),
        ('user2', 'user2@example.com', 22),
        ('user3', 'user3@example.com', 24),
        ('user4', 'user4@example.com', 26),
        ('user5', 'user5@example.com', 28),
    ]
    insert_users_batch(users)

3. 查询数据

查询单条记录

import pymysql

def get_user_by_id(user_id):
    """根据 ID 查询用户"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor(pymysql.cursors.DictCursor) as cursor:
            sql = "SELECT * FROM users WHERE id = %s"
            cursor.execute(sql, (user_id,))
            result = cursor.fetchone()
            
            if result:
                print(f"用户信息:{result}")
                return result
            else:
                print(f"未找到 ID 为 {user_id} 的用户")
                return None
    finally:
        connection.close()

if __name__ == '__main__':
    get_user_by_id(1)

查询多条记录

import pymysql

def get_all_users():
    """查询所有用户"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor(pymysql.cursors.DictCursor) as cursor:
            sql = "SELECT * FROM users ORDER BY id"
            cursor.execute(sql)
            results = cursor.fetchall()
            
            print(f"共查询到 {len(results)} 条记录:")
            for row in results:
                print(f"ID: {row['id']}, 用户名: {row['username']}, "
                      f"邮箱: {row['email']}, 年龄: {row['age']}")
            
            return results
    finally:
        connection.close()

if __name__ == '__main__':
    get_all_users()

条件查询

import pymysql

def search_users_by_age(min_age, max_age):
    """根据年龄范围查询用户"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor(pymysql.cursors.DictCursor) as cursor:
            sql = "SELECT * FROM users WHERE age BETWEEN %s AND %s ORDER BY age"
            cursor.execute(sql, (min_age, max_age))
            results = cursor.fetchall()
            
            print(f"年龄在 {min_age}{max_age} 之间的用户:")
            for row in results:
                print(f"用户名: {row['username']}, 年龄: {row['age']}")
            
            return results
    finally:
        connection.close()

if __name__ == '__main__':
    search_users_by_age(20, 25)

分页查询

import pymysql

def get_users_paginated(page=1, page_size=10):
    """分页查询用户"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor(pymysql.cursors.DictCursor) as cursor:
            # 计算偏移量
            offset = (page - 1) * page_size
            
            # 查询总数
            count_sql = "SELECT COUNT(*) as total FROM users"
            cursor.execute(count_sql)
            total = cursor.fetchone()['total']
            
            # 查询分页数据
            sql = "SELECT * FROM users ORDER BY id LIMIT %s OFFSET %s"
            cursor.execute(sql, (page_size, offset))
            results = cursor.fetchall()
            
            print(f"第 {page} 页,每页 {page_size} 条,共 {total} 条记录:")
            for row in results:
                print(f"ID: {row['id']}, 用户名: {row['username']}")
            
            return {
                'data': results,
                'total': total,
                'page': page,
                'page_size': page_size,
                'total_pages': (total + page_size - 1) // page_size
            }
    finally:
        connection.close()

if __name__ == '__main__':
    get_users_paginated(page=1, page_size=5)

4. 更新数据

import pymysql

def update_user(user_id, username=None, email=None, age=None):
    """更新用户信息"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            # 构建动态更新 SQL
            updates = []
            params = []
            
            if username:
                updates.append("username = %s")
                params.append(username)
            if email:
                updates.append("email = %s")
                params.append(email)
            if age is not None:
                updates.append("age = %s")
                params.append(age)
            
            if not updates:
                print("没有要更新的字段")
                return False
            
            sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
            params.append(user_id)
            
            affected_rows = cursor.execute(sql, tuple(params))
            connection.commit()
            
            if affected_rows > 0:
                print(f"用户 ID {user_id} 更新成功")
                return True
            else:
                print(f"未找到 ID 为 {user_id} 的用户")
                return False
    except Exception as e:
        print(f"更新失败:{e}")
        connection.rollback()
        return False
    finally:
        connection.close()

if __name__ == '__main__':
    update_user(1, age=26)
    update_user(1, email='newemail@example.com')

5. 删除数据

import pymysql

def delete_user(user_id):
    """删除用户"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        with connection.cursor() as cursor:
            sql = "DELETE FROM users WHERE id = %s"
            affected_rows = cursor.execute(sql, (user_id,))
            connection.commit()
            
            if affected_rows > 0:
                print(f"用户 ID {user_id} 删除成功")
                return True
            else:
                print(f"未找到 ID 为 {user_id} 的用户")
                return False
    except Exception as e:
        print(f"删除失败:{e}")
        connection.rollback()
        return False
    finally:
        connection.close()

if __name__ == '__main__':
    delete_user(5)

事务处理

事务是数据库操作的重要概念,确保数据的一致性。PyMySQL 支持事务操作:

import pymysql

def transfer_money(from_user_id, to_user_id, amount):
    """转账操作(演示事务)"""
    connection = pymysql.connect(
        host='localhost',
        user='testuser',
        password='testpass123',
        database='testdb',
        charset='utf8mb4'
    )
    
    try:
        # 开始事务
        connection.begin()
        
        with connection.cursor() as cursor:
            # 检查转出账户余额(假设有余额字段)
            cursor.execute("SELECT id FROM users WHERE id = %s", (from_user_id,))
            if not cursor.fetchone():
                raise Exception(f"转出账户 {from_user_id} 不存在")
            
            # 检查转入账户
            cursor.execute("SELECT id FROM users WHERE id = %s", (to_user_id,))
            if not cursor.fetchone():
                raise Exception(f"转入账户 {to_user_id} 不存在")
            
            # 执行转账操作(这里只是示例,实际需要更新余额字段)
            # 假设有 balance 字段
            # cursor.execute("UPDATE users SET balance = balance - %s WHERE id = %s", 
            #                (amount, from_user_id))
            # cursor.execute("UPDATE users SET balance = balance + %s WHERE id = %s", 
            #                (amount, to_user_id))
            
            print(f"从用户 {from_user_id} 转账 {amount} 到用户 {to_user_id}")
        
        # 提交事务
        connection.commit()
        print("转账成功!")
        return True
        
    except Exception as e:
        # 回滚事务
        connection.rollback()
        print(f"转账失败,已回滚:{e}")
        return False
    finally:
        connection.close()

if __name__ == '__main__':
    transfer_money(1, 2, 100)

连接池管理

对于高并发应用,使用连接池可以提高性能。PyMySQL 本身不提供连接池,但可以使用第三方库如 DBUtils

安装 DBUtils

pip install DBUtils

使用连接池

from dbutils.pooled_db import PooledDB
import pymysql

# 创建连接池
pool = PooledDB(
    creator=pymysql,  # 使用 PyMySQL 作为数据库驱动
    maxconnections=10,  # 连接池最大连接数
    mincached=2,  # 初始化时创建的连接数
    maxcached=5,  # 连接池中空闲连接的最大数
    maxshared=3,  # 共享连接的最大数
    blocking=True,  # 连接池满时是否阻塞等待
    maxusage=None,  # 一个连接最多被使用的次数
    setsession=[],  # 开始会话前执行的命令列表
    ping=1,  # ping MySQL 服务器检查服务是否可用
    host='localhost',
    user='testuser',
    password='testpass123',
    database='testdb',
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

def get_user_with_pool(user_id):
    """使用连接池查询用户"""
    # 从连接池获取连接
    connection = pool.connection()
    
    try:
        with connection.cursor() as cursor:
            sql = "SELECT * FROM users WHERE id = %s"
            cursor.execute(sql, (user_id,))
            result = cursor.fetchone()
            return result
    finally:
        # 将连接归还到连接池
        connection.close()

if __name__ == '__main__':
    user = get_user_with_pool(1)
    print(user)

错误处理与异常

PyMySQL 定义了多种异常类型,需要正确处理:

import pymysql

def safe_query(sql, params=None):
    """安全的查询函数,包含完整的错误处理"""
    connection = None
    try:
        connection = pymysql.connect(
            host='localhost',
            user='testuser',
            password='testpass123',
            database='testdb',
            charset='utf8mb4'
        )
        
        with connection.cursor(pymysql.cursors.DictCursor) as cursor:
            cursor.execute(sql, params or ())
            return cursor.fetchall()
            
    except pymysql.OperationalError as e:
        print(f"操作错误:{e}")
        # 可能是连接问题、表不存在等
        return None
    except pymysql.ProgrammingError as e:
        print(f"编程错误:{e}")
        # SQL 语法错误、参数错误等
        return None
    except pymysql.IntegrityError as e:
        print(f"完整性错误:{e}")
        # 违反唯一约束、外键约束等
        return None
    except pymysql.DataError as e:
        print(f"数据错误:{e}")
        # 数据类型错误、数据过长等
        return None
    except pymysql.InternalError as e:
        print(f"内部错误:{e}")
        # MySQL 内部错误
        return None
    except pymysql.NotSupportedError as e:
        print(f"不支持的操作:{e}")
        return None
    except Exception as e:
        print(f"未知错误:{e}")
        return None
    finally:
        if connection:
            connection.close()

if __name__ == '__main__':
    # 测试各种错误情况
    safe_query("SELECT * FROM nonexistent_table")
    safe_query("SELECT * FROM users WHERE id = %s", (999,))

最佳实践

1. 使用配置类管理数据库连接

import pymysql
from dataclasses import dataclass

@dataclass
class DatabaseConfig:
    """数据库配置类"""
    host: str = 'localhost'
    user: str = 'testuser'
    password: str = 'testpass123'
    database: str = 'testdb'
    port: int = 3306
    charset: str = 'utf8mb4'
    
    def get_connection(self):
        """获取数据库连接"""
        return pymysql.connect(
            host=self.host,
            user=self.user,
            password=self.password,
            database=self.database,
            port=self.port,
            charset=self.charset
        )

# 使用配置类
config = DatabaseConfig()
connection = config.get_connection()

2. 创建数据库操作类

import pymysql
from typing import Optional, List, Dict, Any

class UserDAO:
    """用户数据访问对象"""
    
    def __init__(self, config: DatabaseConfig):
        self.config = config
    
    def get_connection(self):
        """获取数据库连接"""
        return self.config.get_connection()
    
    def create_user(self, username: str, email: str, age: int) -> Optional[int]:
        """创建用户"""
        connection = self.get_connection()
        try:
            with connection.cursor() as cursor:
                sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
                cursor.execute(sql, (username, email, age))
                connection.commit()
                return cursor.lastrowid
        except pymysql.IntegrityError:
            connection.rollback()
            return None
        finally:
            connection.close()
    
    def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
        """根据 ID 获取用户"""
        connection = self.get_connection()
        try:
            with connection.cursor(pymysql.cursors.DictCursor) as cursor:
                sql = "SELECT * FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                return cursor.fetchone()
        finally:
            connection.close()
    
    def get_all_users(self) -> List[Dict[str, Any]]:
        """获取所有用户"""
        connection = self.get_connection()
        try:
            with connection.cursor(pymysql.cursors.DictCursor) as cursor:
                sql = "SELECT * FROM users ORDER BY id"
                cursor.execute(sql)
                return cursor.fetchall()
        finally:
            connection.close()
    
    def update_user(self, user_id: int, **kwargs) -> bool:
        """更新用户信息"""
        if not kwargs:
            return False
        
        connection = self.get_connection()
        try:
            with connection.cursor() as cursor:
                updates = [f"{key} = %s" for key in kwargs.keys()]
                sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
                params = list(kwargs.values()) + [user_id]
                cursor.execute(sql, tuple(params))
                connection.commit()
                return cursor.rowcount > 0
        except Exception as e:
            connection.rollback()
            print(f"更新失败:{e}")
            return False
        finally:
            connection.close()
    
    def delete_user(self, user_id: int) -> bool:
        """删除用户"""
        connection = self.get_connection()
        try:
            with connection.cursor() as cursor:
                sql = "DELETE FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                connection.commit()
                return cursor.rowcount > 0
        except Exception as e:
            connection.rollback()
            print(f"删除失败:{e}")
            return False
        finally:
            connection.close()

# 使用示例
if __name__ == '__main__':
    config = DatabaseConfig()
    dao = UserDAO(config)
    
    # 创建用户
    user_id = dao.create_user('testuser', 'test@example.com', 25)
    print(f"创建用户,ID: {user_id}")
    
    # 查询用户
    user = dao.get_user_by_id(user_id)
    print(f"查询用户: {user}")
    
    # 更新用户
    dao.update_user(user_id, age=26)
    
    # 删除用户
    dao.delete_user(user_id)

3. 使用上下文管理器封装

import pymysql
from contextlib import contextmanager

class DatabaseManager:
    """数据库管理器"""
    
    def __init__(self, config: DatabaseConfig):
        self.config = config
    
    @contextmanager
    def get_cursor(self, dict_cursor=False):
        """获取游标的上下文管理器"""
        connection = self.config.get_connection()
        try:
            cursor_class = pymysql.cursors.DictCursor if dict_cursor else pymysql.cursors.Cursor
            with connection.cursor(cursor_class) as cursor:
                yield cursor, connection
                connection.commit()
        except Exception:
            connection.rollback()
            raise
        finally:
            connection.close()

# 使用示例
if __name__ == '__main__':
    config = DatabaseConfig()
    db = DatabaseManager(config)
    
    with db.get_cursor(dict_cursor=True) as (cursor, conn):
        cursor.execute("SELECT * FROM users LIMIT 5")
        results = cursor.fetchall()
        for row in results:
            print(row)

完整示例项目

以下是一个完整的示例项目,包含所有常用功能:

"""
PyMySQL 完整示例项目
包含数据库连接、CRUD 操作、事务处理等
"""

import pymysql
from pymysql.cursors import DictCursor
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from dataclasses import dataclass
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class DatabaseConfig:
    """数据库配置"""
    host: str = 'localhost'
    user: str = 'testuser'
    password: str = 'testpass123'
    database: str = 'testdb'
    port: int = 3306
    charset: str = 'utf8mb4'


class DatabaseManager:
    """数据库管理器"""
    
    def __init__(self, config: DatabaseConfig):
        self.config = config
    
    def get_connection(self):
        """获取数据库连接"""
        return pymysql.connect(
            host=self.config.host,
            user=self.config.user,
            password=self.config.password,
            database=self.config.database,
            port=self.config.port,
            charset=self.config.charset
        )
    
    @contextmanager
    def get_cursor(self, dict_cursor: bool = False):
        """获取游标的上下文管理器"""
        connection = self.get_connection()
        try:
            cursor_class = DictCursor if dict_cursor else pymysql.cursors.Cursor
            with connection.cursor(cursor_class) as cursor:
                yield cursor, connection
                connection.commit()
        except Exception as e:
            connection.rollback()
            logger.error(f"数据库操作失败: {e}")
            raise
        finally:
            connection.close()


class UserService:
    """用户服务类"""
    
    def __init__(self, db_manager: DatabaseManager):
        self.db = db_manager
    
    def create_table(self):
        """创建用户表"""
        with self.db.get_cursor() as (cursor, conn):
            sql = """
            CREATE TABLE IF NOT EXISTS users (
                id INT AUTO_INCREMENT PRIMARY KEY,
                username VARCHAR(50) NOT NULL UNIQUE,
                email VARCHAR(100) NOT NULL,
                age INT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                INDEX idx_username (username),
                INDEX idx_email (email)
            ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
            """
            cursor.execute(sql)
            logger.info("用户表创建成功")
    
    def create_user(self, username: str, email: str, age: int) -> Optional[int]:
        """创建用户"""
        try:
            with self.db.get_cursor() as (cursor, conn):
                sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
                cursor.execute(sql, (username, email, age))
                user_id = cursor.lastrowid
                logger.info(f"用户创建成功,ID: {user_id}")
                return user_id
        except pymysql.IntegrityError as e:
            logger.error(f"用户创建失败:用户名或邮箱已存在 - {e}")
            return None
    
    def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
        """根据 ID 获取用户"""
        with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
            sql = "SELECT * FROM users WHERE id = %s"
            cursor.execute(sql, (user_id,))
            return cursor.fetchone()
    
    def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
        """根据用户名获取用户"""
        with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
            sql = "SELECT * FROM users WHERE username = %s"
            cursor.execute(sql, (username,))
            return cursor.fetchone()
    
    def get_all_users(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """获取所有用户"""
        with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
            sql = "SELECT * FROM users ORDER BY id"
            if limit:
                sql += f" LIMIT {limit}"
            cursor.execute(sql)
            return cursor.fetchall()
    
    def update_user(self, user_id: int, **kwargs) -> bool:
        """更新用户信息"""
        if not kwargs:
            return False
        
        try:
            with self.db.get_cursor() as (cursor, conn):
                updates = [f"{key} = %s" for key in kwargs.keys()]
                sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
                params = list(kwargs.values()) + [user_id]
                cursor.execute(sql, tuple(params))
                if cursor.rowcount > 0:
                    logger.info(f"用户 {user_id} 更新成功")
                    return True
                else:
                    logger.warning(f"用户 {user_id} 不存在")
                    return False
        except Exception as e:
            logger.error(f"更新用户失败: {e}")
            return False
    
    def delete_user(self, user_id: int) -> bool:
        """删除用户"""
        try:
            with self.db.get_cursor() as (cursor, conn):
                sql = "DELETE FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                if cursor.rowcount > 0:
                    logger.info(f"用户 {user_id} 删除成功")
                    return True
                else:
                    logger.warning(f"用户 {user_id} 不存在")
                    return False
        except Exception as e:
            logger.error(f"删除用户失败: {e}")
            return False
    
    def search_users(self, keyword: str) -> List[Dict[str, Any]]:
        """搜索用户(按用户名或邮箱)"""
        with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
            sql = """
            SELECT * FROM users 
            WHERE username LIKE %s OR email LIKE %s 
            ORDER BY id
            """
            pattern = f"%{keyword}%"
            cursor.execute(sql, (pattern, pattern))
            return cursor.fetchall()
    
    def get_users_by_age_range(self, min_age: int, max_age: int) -> List[Dict[str, Any]]:
        """根据年龄范围查询用户"""
        with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
            sql = "SELECT * FROM users WHERE age BETWEEN %s AND %s ORDER BY age"
            cursor.execute(sql, (min_age, max_age))
            return cursor.fetchall()


def main():
    """主函数 - 演示所有功能"""
    # 初始化配置和数据库管理器
    config = DatabaseConfig()
    db_manager = DatabaseManager(config)
    user_service = UserService(db_manager)
    
    # 创建表
    print("=" * 50)
    print("1. 创建用户表")
    print("=" * 50)
    user_service.create_table()
    
    # 创建用户
    print("\n" + "=" * 50)
    print("2. 创建用户")
    print("=" * 50)
    user_ids = []
    users_data = [
        ('alice', 'alice@example.com', 25),
        ('bob', 'bob@example.com', 30),
        ('charlie', 'charlie@example.com', 28),
        ('david', 'david@example.com', 32),
        ('eve', 'eve@example.com', 26),
    ]
    
    for username, email, age in users_data:
        user_id = user_service.create_user(username, email, age)
        if user_id:
            user_ids.append(user_id)
    
    # 查询所有用户
    print("\n" + "=" * 50)
    print("3. 查询所有用户")
    print("=" * 50)
    all_users = user_service.get_all_users()
    for user in all_users:
        print(f"ID: {user['id']}, 用户名: {user['username']}, "
              f"邮箱: {user['email']}, 年龄: {user['age']}")
    
    # 根据 ID 查询用户
    print("\n" + "=" * 50)
    print("4. 根据 ID 查询用户")
    print("=" * 50)
    if user_ids:
        user = user_service.get_user_by_id(user_ids[0])
        if user:
            print(f"用户信息: {user}")
    
    # 更新用户
    print("\n" + "=" * 50)
    print("5. 更新用户信息")
    print("=" * 50)
    if user_ids:
        user_service.update_user(user_ids[0], age=27, email='newemail@example.com')
        updated_user = user_service.get_user_by_id(user_ids[0])
        print(f"更新后的用户信息: {updated_user}")
    
    # 搜索用户
    print("\n" + "=" * 50)
    print("6. 搜索用户(关键词: 'alice')")
    print("=" * 50)
    results = user_service.search_users('alice')
    for user in results:
        print(f"找到用户: {user['username']} - {user['email']}")
    
    # 年龄范围查询
    print("\n" + "=" * 50)
    print("7. 年龄范围查询(25-30岁)")
    print("=" * 50)
    age_users = user_service.get_users_by_age_range(25, 30)
    for user in age_users:
        print(f"用户: {user['username']}, 年龄: {user['age']}")
    
    # 删除用户
    print("\n" + "=" * 50)
    print("8. 删除用户")
    print("=" * 50)
    if len(user_ids) > 1:
        user_service.delete_user(user_ids[-1])
        print(f"已删除用户 ID: {user_ids[-1]}")
    
    # 最终查询
    print("\n" + "=" * 50)
    print("9. 最终用户列表")
    print("=" * 50)
    final_users = user_service.get_all_users()
    print(f"剩余用户数量: {len(final_users)}")
    for user in final_users:
        print(f"ID: {user['id']}, 用户名: {user['username']}")


if __name__ == '__main__':
    main()

总结

本文详细介绍了 PyMySQL 的使用方法,包括:

  1. 基本连接:如何建立和管理数据库连接
  2. CRUD 操作:创建、读取、更新、删除数据
  3. 事务处理:确保数据一致性
  4. 连接池:提高应用性能
  5. 错误处理:正确处理各种异常情况
  6. 最佳实践:代码组织和架构设计

关键要点

  • 始终使用参数化查询:防止 SQL 注入攻击
  • 使用上下文管理器:确保连接正确关闭
  • 处理异常:捕获并处理各种数据库异常
  • 使用连接池:在高并发场景下提高性能
  • 代码组织:使用 DAO 模式或服务类组织代码

下一步学习

  • 学习 SQLAlchemy ORM 框架
  • 了解数据库索引优化
  • 学习数据库备份和恢复
  • 掌握数据库性能调优

希望这篇文章能帮助你掌握 PyMySQL 的使用!


评论