PyMySQL 数据库操作详解
目录
PyMySQL 简介
PyMySQL 是一个纯 Python 实现的 MySQL 客户端库,完全兼容 MySQLdb(MySQLdb 是 Python 2 时代的库)。PyMySQL 的主要特点:
- 纯 Python 实现:无需编译,跨平台支持
- 兼容性好:兼容 MySQLdb 的 API
- 支持 Python 3:完全支持 Python 3.x
- 轻量级:代码简洁,易于使用
- 活跃维护:持续更新,社区活跃
安装与环境准备
安装 PyMySQL
使用 pip 安装:
pip install pymysql
或者使用 conda:
conda install -c conda-forge pymysql
准备测试数据库
在开始之前,确保你已经有一个可用的 MySQL 数据库。如果没有,可以:
- 安装 MySQL 服务器
- 创建测试数据库和用户
以下是创建测试环境的 SQL 脚本:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS testdb CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- 创建用户(可选)
CREATE USER IF NOT EXISTS 'testuser'@'localhost' IDENTIFIED BY 'testpass123';
-- 授权
GRANT ALL PRIVILEGES ON testdb.* TO 'testuser'@'localhost';
FLUSH PRIVILEGES;
-- 使用数据库
USE testdb;
基本连接操作
1. 建立数据库连接
最基本的连接方式:
import pymysql
# 建立连接
connection = pymysql.connect(
host='localhost', # 数据库主机地址
user='testuser', # 数据库用户名
password='testpass123', # 数据库密码
database='testdb', # 数据库名称
port=3306, # 端口号,默认 3306
charset='utf8mb4' # 字符集
)
print("数据库连接成功!")
# 关闭连接
connection.close()
2. 使用字典游标
默认情况下,查询结果返回元组。使用字典游标可以返回字典格式的结果:
import pymysql
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
# 创建字典游标
cursor = connection.cursor(pymysql.cursors.DictCursor)
# 执行查询
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
print(f"MySQL 版本: {result['VERSION()']}")
cursor.close()
connection.close()
3. 使用上下文管理器(推荐)
使用 with 语句可以自动管理连接的关闭:
import pymysql
# 使用上下文管理器自动关闭连接
with pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
) as connection:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
print(f"MySQL 版本: {result['VERSION()']}")
数据库基本操作
1. 创建表
import pymysql
def create_table():
"""创建用户表"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 创建用户表
sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
cursor.execute(sql)
connection.commit()
print("表创建成功!")
finally:
connection.close()
if __name__ == '__main__':
create_table()
2. 插入数据
单条插入
import pymysql
from datetime import datetime
def insert_user(username, email, age):
"""插入单个用户"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
cursor.execute(sql, (username, email, age))
connection.commit()
print(f"用户 {username} 插入成功,ID: {cursor.lastrowid}")
return cursor.lastrowid
except pymysql.IntegrityError as e:
print(f"插入失败:用户名或邮箱已存在 - {e}")
connection.rollback()
except Exception as e:
print(f"插入失败:{e}")
connection.rollback()
finally:
connection.close()
if __name__ == '__main__':
insert_user('zhangsan', 'zhangsan@example.com', 25)
insert_user('lisi', 'lisi@example.com', 30)
insert_user('wangwu', 'wangwu@example.com', 28)
批量插入
import pymysql
def insert_users_batch(users_data):
"""批量插入用户"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
# executemany 用于批量插入
affected_rows = cursor.executemany(sql, users_data)
connection.commit()
print(f"批量插入成功,共插入 {affected_rows} 条记录")
except Exception as e:
print(f"批量插入失败:{e}")
connection.rollback()
finally:
connection.close()
if __name__ == '__main__':
users = [
('user1', 'user1@example.com', 20),
('user2', 'user2@example.com', 22),
('user3', 'user3@example.com', 24),
('user4', 'user4@example.com', 26),
('user5', 'user5@example.com', 28),
]
insert_users_batch(users)
3. 查询数据
查询单条记录
import pymysql
def get_user_by_id(user_id):
"""根据 ID 查询用户"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
result = cursor.fetchone()
if result:
print(f"用户信息:{result}")
return result
else:
print(f"未找到 ID 为 {user_id} 的用户")
return None
finally:
connection.close()
if __name__ == '__main__':
get_user_by_id(1)
查询多条记录
import pymysql
def get_all_users():
"""查询所有用户"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = "SELECT * FROM users ORDER BY id"
cursor.execute(sql)
results = cursor.fetchall()
print(f"共查询到 {len(results)} 条记录:")
for row in results:
print(f"ID: {row['id']}, 用户名: {row['username']}, "
f"邮箱: {row['email']}, 年龄: {row['age']}")
return results
finally:
connection.close()
if __name__ == '__main__':
get_all_users()
条件查询
import pymysql
def search_users_by_age(min_age, max_age):
"""根据年龄范围查询用户"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = "SELECT * FROM users WHERE age BETWEEN %s AND %s ORDER BY age"
cursor.execute(sql, (min_age, max_age))
results = cursor.fetchall()
print(f"年龄在 {min_age} 到 {max_age} 之间的用户:")
for row in results:
print(f"用户名: {row['username']}, 年龄: {row['age']}")
return results
finally:
connection.close()
if __name__ == '__main__':
search_users_by_age(20, 25)
分页查询
import pymysql
def get_users_paginated(page=1, page_size=10):
"""分页查询用户"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
# 计算偏移量
offset = (page - 1) * page_size
# 查询总数
count_sql = "SELECT COUNT(*) as total FROM users"
cursor.execute(count_sql)
total = cursor.fetchone()['total']
# 查询分页数据
sql = "SELECT * FROM users ORDER BY id LIMIT %s OFFSET %s"
cursor.execute(sql, (page_size, offset))
results = cursor.fetchall()
print(f"第 {page} 页,每页 {page_size} 条,共 {total} 条记录:")
for row in results:
print(f"ID: {row['id']}, 用户名: {row['username']}")
return {
'data': results,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
finally:
connection.close()
if __name__ == '__main__':
get_users_paginated(page=1, page_size=5)
4. 更新数据
import pymysql
def update_user(user_id, username=None, email=None, age=None):
"""更新用户信息"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
# 构建动态更新 SQL
updates = []
params = []
if username:
updates.append("username = %s")
params.append(username)
if email:
updates.append("email = %s")
params.append(email)
if age is not None:
updates.append("age = %s")
params.append(age)
if not updates:
print("没有要更新的字段")
return False
sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
params.append(user_id)
affected_rows = cursor.execute(sql, tuple(params))
connection.commit()
if affected_rows > 0:
print(f"用户 ID {user_id} 更新成功")
return True
else:
print(f"未找到 ID 为 {user_id} 的用户")
return False
except Exception as e:
print(f"更新失败:{e}")
connection.rollback()
return False
finally:
connection.close()
if __name__ == '__main__':
update_user(1, age=26)
update_user(1, email='newemail@example.com')
5. 删除数据
import pymysql
def delete_user(user_id):
"""删除用户"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
with connection.cursor() as cursor:
sql = "DELETE FROM users WHERE id = %s"
affected_rows = cursor.execute(sql, (user_id,))
connection.commit()
if affected_rows > 0:
print(f"用户 ID {user_id} 删除成功")
return True
else:
print(f"未找到 ID 为 {user_id} 的用户")
return False
except Exception as e:
print(f"删除失败:{e}")
connection.rollback()
return False
finally:
connection.close()
if __name__ == '__main__':
delete_user(5)
事务处理
事务是数据库操作的重要概念,确保数据的一致性。PyMySQL 支持事务操作:
import pymysql
def transfer_money(from_user_id, to_user_id, amount):
"""转账操作(演示事务)"""
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
try:
# 开始事务
connection.begin()
with connection.cursor() as cursor:
# 检查转出账户余额(假设有余额字段)
cursor.execute("SELECT id FROM users WHERE id = %s", (from_user_id,))
if not cursor.fetchone():
raise Exception(f"转出账户 {from_user_id} 不存在")
# 检查转入账户
cursor.execute("SELECT id FROM users WHERE id = %s", (to_user_id,))
if not cursor.fetchone():
raise Exception(f"转入账户 {to_user_id} 不存在")
# 执行转账操作(这里只是示例,实际需要更新余额字段)
# 假设有 balance 字段
# cursor.execute("UPDATE users SET balance = balance - %s WHERE id = %s",
# (amount, from_user_id))
# cursor.execute("UPDATE users SET balance = balance + %s WHERE id = %s",
# (amount, to_user_id))
print(f"从用户 {from_user_id} 转账 {amount} 到用户 {to_user_id}")
# 提交事务
connection.commit()
print("转账成功!")
return True
except Exception as e:
# 回滚事务
connection.rollback()
print(f"转账失败,已回滚:{e}")
return False
finally:
connection.close()
if __name__ == '__main__':
transfer_money(1, 2, 100)
连接池管理
对于高并发应用,使用连接池可以提高性能。PyMySQL 本身不提供连接池,但可以使用第三方库如 DBUtils:
安装 DBUtils
pip install DBUtils
使用连接池
from dbutils.pooled_db import PooledDB
import pymysql
# 创建连接池
pool = PooledDB(
creator=pymysql, # 使用 PyMySQL 作为数据库驱动
maxconnections=10, # 连接池最大连接数
mincached=2, # 初始化时创建的连接数
maxcached=5, # 连接池中空闲连接的最大数
maxshared=3, # 共享连接的最大数
blocking=True, # 连接池满时是否阻塞等待
maxusage=None, # 一个连接最多被使用的次数
setsession=[], # 开始会话前执行的命令列表
ping=1, # ping MySQL 服务器检查服务是否可用
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
def get_user_with_pool(user_id):
"""使用连接池查询用户"""
# 从连接池获取连接
connection = pool.connection()
try:
with connection.cursor() as cursor:
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
result = cursor.fetchone()
return result
finally:
# 将连接归还到连接池
connection.close()
if __name__ == '__main__':
user = get_user_with_pool(1)
print(user)
错误处理与异常
PyMySQL 定义了多种异常类型,需要正确处理:
import pymysql
def safe_query(sql, params=None):
"""安全的查询函数,包含完整的错误处理"""
connection = None
try:
connection = pymysql.connect(
host='localhost',
user='testuser',
password='testpass123',
database='testdb',
charset='utf8mb4'
)
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute(sql, params or ())
return cursor.fetchall()
except pymysql.OperationalError as e:
print(f"操作错误:{e}")
# 可能是连接问题、表不存在等
return None
except pymysql.ProgrammingError as e:
print(f"编程错误:{e}")
# SQL 语法错误、参数错误等
return None
except pymysql.IntegrityError as e:
print(f"完整性错误:{e}")
# 违反唯一约束、外键约束等
return None
except pymysql.DataError as e:
print(f"数据错误:{e}")
# 数据类型错误、数据过长等
return None
except pymysql.InternalError as e:
print(f"内部错误:{e}")
# MySQL 内部错误
return None
except pymysql.NotSupportedError as e:
print(f"不支持的操作:{e}")
return None
except Exception as e:
print(f"未知错误:{e}")
return None
finally:
if connection:
connection.close()
if __name__ == '__main__':
# 测试各种错误情况
safe_query("SELECT * FROM nonexistent_table")
safe_query("SELECT * FROM users WHERE id = %s", (999,))
最佳实践
1. 使用配置类管理数据库连接
import pymysql
from dataclasses import dataclass
@dataclass
class DatabaseConfig:
"""数据库配置类"""
host: str = 'localhost'
user: str = 'testuser'
password: str = 'testpass123'
database: str = 'testdb'
port: int = 3306
charset: str = 'utf8mb4'
def get_connection(self):
"""获取数据库连接"""
return pymysql.connect(
host=self.host,
user=self.user,
password=self.password,
database=self.database,
port=self.port,
charset=self.charset
)
# 使用配置类
config = DatabaseConfig()
connection = config.get_connection()
2. 创建数据库操作类
import pymysql
from typing import Optional, List, Dict, Any
class UserDAO:
"""用户数据访问对象"""
def __init__(self, config: DatabaseConfig):
self.config = config
def get_connection(self):
"""获取数据库连接"""
return self.config.get_connection()
def create_user(self, username: str, email: str, age: int) -> Optional[int]:
"""创建用户"""
connection = self.get_connection()
try:
with connection.cursor() as cursor:
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
cursor.execute(sql, (username, email, age))
connection.commit()
return cursor.lastrowid
except pymysql.IntegrityError:
connection.rollback()
return None
finally:
connection.close()
def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
"""根据 ID 获取用户"""
connection = self.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
finally:
connection.close()
def get_all_users(self) -> List[Dict[str, Any]]:
"""获取所有用户"""
connection = self.get_connection()
try:
with connection.cursor(pymysql.cursors.DictCursor) as cursor:
sql = "SELECT * FROM users ORDER BY id"
cursor.execute(sql)
return cursor.fetchall()
finally:
connection.close()
def update_user(self, user_id: int, **kwargs) -> bool:
"""更新用户信息"""
if not kwargs:
return False
connection = self.get_connection()
try:
with connection.cursor() as cursor:
updates = [f"{key} = %s" for key in kwargs.keys()]
sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
params = list(kwargs.values()) + [user_id]
cursor.execute(sql, tuple(params))
connection.commit()
return cursor.rowcount > 0
except Exception as e:
connection.rollback()
print(f"更新失败:{e}")
return False
finally:
connection.close()
def delete_user(self, user_id: int) -> bool:
"""删除用户"""
connection = self.get_connection()
try:
with connection.cursor() as cursor:
sql = "DELETE FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
connection.commit()
return cursor.rowcount > 0
except Exception as e:
connection.rollback()
print(f"删除失败:{e}")
return False
finally:
connection.close()
# 使用示例
if __name__ == '__main__':
config = DatabaseConfig()
dao = UserDAO(config)
# 创建用户
user_id = dao.create_user('testuser', 'test@example.com', 25)
print(f"创建用户,ID: {user_id}")
# 查询用户
user = dao.get_user_by_id(user_id)
print(f"查询用户: {user}")
# 更新用户
dao.update_user(user_id, age=26)
# 删除用户
dao.delete_user(user_id)
3. 使用上下文管理器封装
import pymysql
from contextlib import contextmanager
class DatabaseManager:
"""数据库管理器"""
def __init__(self, config: DatabaseConfig):
self.config = config
@contextmanager
def get_cursor(self, dict_cursor=False):
"""获取游标的上下文管理器"""
connection = self.config.get_connection()
try:
cursor_class = pymysql.cursors.DictCursor if dict_cursor else pymysql.cursors.Cursor
with connection.cursor(cursor_class) as cursor:
yield cursor, connection
connection.commit()
except Exception:
connection.rollback()
raise
finally:
connection.close()
# 使用示例
if __name__ == '__main__':
config = DatabaseConfig()
db = DatabaseManager(config)
with db.get_cursor(dict_cursor=True) as (cursor, conn):
cursor.execute("SELECT * FROM users LIMIT 5")
results = cursor.fetchall()
for row in results:
print(row)
完整示例项目
以下是一个完整的示例项目,包含所有常用功能:
"""
PyMySQL 完整示例项目
包含数据库连接、CRUD 操作、事务处理等
"""
import pymysql
from pymysql.cursors import DictCursor
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from dataclasses import dataclass
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class DatabaseConfig:
"""数据库配置"""
host: str = 'localhost'
user: str = 'testuser'
password: str = 'testpass123'
database: str = 'testdb'
port: int = 3306
charset: str = 'utf8mb4'
class DatabaseManager:
"""数据库管理器"""
def __init__(self, config: DatabaseConfig):
self.config = config
def get_connection(self):
"""获取数据库连接"""
return pymysql.connect(
host=self.config.host,
user=self.config.user,
password=self.config.password,
database=self.config.database,
port=self.config.port,
charset=self.config.charset
)
@contextmanager
def get_cursor(self, dict_cursor: bool = False):
"""获取游标的上下文管理器"""
connection = self.get_connection()
try:
cursor_class = DictCursor if dict_cursor else pymysql.cursors.Cursor
with connection.cursor(cursor_class) as cursor:
yield cursor, connection
connection.commit()
except Exception as e:
connection.rollback()
logger.error(f"数据库操作失败: {e}")
raise
finally:
connection.close()
class UserService:
"""用户服务类"""
def __init__(self, db_manager: DatabaseManager):
self.db = db_manager
def create_table(self):
"""创建用户表"""
with self.db.get_cursor() as (cursor, conn):
sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
cursor.execute(sql)
logger.info("用户表创建成功")
def create_user(self, username: str, email: str, age: int) -> Optional[int]:
"""创建用户"""
try:
with self.db.get_cursor() as (cursor, conn):
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
cursor.execute(sql, (username, email, age))
user_id = cursor.lastrowid
logger.info(f"用户创建成功,ID: {user_id}")
return user_id
except pymysql.IntegrityError as e:
logger.error(f"用户创建失败:用户名或邮箱已存在 - {e}")
return None
def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
"""根据 ID 获取用户"""
with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
sql = "SELECT * FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
return cursor.fetchone()
def get_user_by_username(self, username: str) -> Optional[Dict[str, Any]]:
"""根据用户名获取用户"""
with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
sql = "SELECT * FROM users WHERE username = %s"
cursor.execute(sql, (username,))
return cursor.fetchone()
def get_all_users(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""获取所有用户"""
with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
sql = "SELECT * FROM users ORDER BY id"
if limit:
sql += f" LIMIT {limit}"
cursor.execute(sql)
return cursor.fetchall()
def update_user(self, user_id: int, **kwargs) -> bool:
"""更新用户信息"""
if not kwargs:
return False
try:
with self.db.get_cursor() as (cursor, conn):
updates = [f"{key} = %s" for key in kwargs.keys()]
sql = f"UPDATE users SET {', '.join(updates)} WHERE id = %s"
params = list(kwargs.values()) + [user_id]
cursor.execute(sql, tuple(params))
if cursor.rowcount > 0:
logger.info(f"用户 {user_id} 更新成功")
return True
else:
logger.warning(f"用户 {user_id} 不存在")
return False
except Exception as e:
logger.error(f"更新用户失败: {e}")
return False
def delete_user(self, user_id: int) -> bool:
"""删除用户"""
try:
with self.db.get_cursor() as (cursor, conn):
sql = "DELETE FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
if cursor.rowcount > 0:
logger.info(f"用户 {user_id} 删除成功")
return True
else:
logger.warning(f"用户 {user_id} 不存在")
return False
except Exception as e:
logger.error(f"删除用户失败: {e}")
return False
def search_users(self, keyword: str) -> List[Dict[str, Any]]:
"""搜索用户(按用户名或邮箱)"""
with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
sql = """
SELECT * FROM users
WHERE username LIKE %s OR email LIKE %s
ORDER BY id
"""
pattern = f"%{keyword}%"
cursor.execute(sql, (pattern, pattern))
return cursor.fetchall()
def get_users_by_age_range(self, min_age: int, max_age: int) -> List[Dict[str, Any]]:
"""根据年龄范围查询用户"""
with self.db.get_cursor(dict_cursor=True) as (cursor, conn):
sql = "SELECT * FROM users WHERE age BETWEEN %s AND %s ORDER BY age"
cursor.execute(sql, (min_age, max_age))
return cursor.fetchall()
def main():
"""主函数 - 演示所有功能"""
# 初始化配置和数据库管理器
config = DatabaseConfig()
db_manager = DatabaseManager(config)
user_service = UserService(db_manager)
# 创建表
print("=" * 50)
print("1. 创建用户表")
print("=" * 50)
user_service.create_table()
# 创建用户
print("\n" + "=" * 50)
print("2. 创建用户")
print("=" * 50)
user_ids = []
users_data = [
('alice', 'alice@example.com', 25),
('bob', 'bob@example.com', 30),
('charlie', 'charlie@example.com', 28),
('david', 'david@example.com', 32),
('eve', 'eve@example.com', 26),
]
for username, email, age in users_data:
user_id = user_service.create_user(username, email, age)
if user_id:
user_ids.append(user_id)
# 查询所有用户
print("\n" + "=" * 50)
print("3. 查询所有用户")
print("=" * 50)
all_users = user_service.get_all_users()
for user in all_users:
print(f"ID: {user['id']}, 用户名: {user['username']}, "
f"邮箱: {user['email']}, 年龄: {user['age']}")
# 根据 ID 查询用户
print("\n" + "=" * 50)
print("4. 根据 ID 查询用户")
print("=" * 50)
if user_ids:
user = user_service.get_user_by_id(user_ids[0])
if user:
print(f"用户信息: {user}")
# 更新用户
print("\n" + "=" * 50)
print("5. 更新用户信息")
print("=" * 50)
if user_ids:
user_service.update_user(user_ids[0], age=27, email='newemail@example.com')
updated_user = user_service.get_user_by_id(user_ids[0])
print(f"更新后的用户信息: {updated_user}")
# 搜索用户
print("\n" + "=" * 50)
print("6. 搜索用户(关键词: 'alice')")
print("=" * 50)
results = user_service.search_users('alice')
for user in results:
print(f"找到用户: {user['username']} - {user['email']}")
# 年龄范围查询
print("\n" + "=" * 50)
print("7. 年龄范围查询(25-30岁)")
print("=" * 50)
age_users = user_service.get_users_by_age_range(25, 30)
for user in age_users:
print(f"用户: {user['username']}, 年龄: {user['age']}")
# 删除用户
print("\n" + "=" * 50)
print("8. 删除用户")
print("=" * 50)
if len(user_ids) > 1:
user_service.delete_user(user_ids[-1])
print(f"已删除用户 ID: {user_ids[-1]}")
# 最终查询
print("\n" + "=" * 50)
print("9. 最终用户列表")
print("=" * 50)
final_users = user_service.get_all_users()
print(f"剩余用户数量: {len(final_users)}")
for user in final_users:
print(f"ID: {user['id']}, 用户名: {user['username']}")
if __name__ == '__main__':
main()
总结
本文详细介绍了 PyMySQL 的使用方法,包括:
- 基本连接:如何建立和管理数据库连接
- CRUD 操作:创建、读取、更新、删除数据
- 事务处理:确保数据一致性
- 连接池:提高应用性能
- 错误处理:正确处理各种异常情况
- 最佳实践:代码组织和架构设计
关键要点
- 始终使用参数化查询:防止 SQL 注入攻击
- 使用上下文管理器:确保连接正确关闭
- 处理异常:捕获并处理各种数据库异常
- 使用连接池:在高并发场景下提高性能
- 代码组织:使用 DAO 模式或服务类组织代码
下一步学习
- 学习 SQLAlchemy ORM 框架
- 了解数据库索引优化
- 学习数据库备份和恢复
- 掌握数据库性能调优
希望这篇文章能帮助你掌握 PyMySQL 的使用!