一.概述
在现代应用程序的架构中,数据库扮演着核心角色,是支撑业务运转的重要基础设施。随着业务规模的扩展和需求的演进,数据库的性能、可用性和安全性也需要随之提升。因此,升级数据库版本成为了一项不可避免的任务。特别是从旧版本向新版本迁移时,新版本往往能够提供更强大的功能、更高的性能优化,以及更完善的安全特性。然而,数据库的升级并非易事,这一过程通常涉及到数据结构的变化、功能的不兼容性以及对生产环境的潜在影响。
在实际操作中,数据库升级的最大挑战在于如何在保障数据完整性的同时,维持业务的连续性。对于许多高可用性要求的业务系统来说,即便是几分钟的停机都可能带来巨大的损失。为了解决这一难题,实现“零停机”数据库迁移成为了关键目标。实现零停机迁移需要克服诸多技术难点,比如数据一致性管理、实时数据同步、并发控制等,这需要一个精细设计的解决方案。
在本文中,我们将详细介绍一种结合“系统双写”和“数据补全”技术的迁移方案,如何确保 Aurora 数据库从 5.7 版本平滑过渡到 8.0 新版本。通过这一方案,既能保证迁移期间数据的完整性,又能做到业务系统无感知迁移,真正实现零停机升级。此外,我们还将分步骤解析迁移方案的具体实现,包括系统架构的调整、双写同步的配置、数据校验与补全的策略以及最终的切换方法,帮助您深入理解这一复杂过程的核心技术点。无论您是希望为现有系统进行版本升级,还是规划未来的数据库架构优化,这一迁移方案都能为您提供切实可行的参考。希望通过本文的介绍,能够帮助您高效完成数据库的升级任务,为业务发展提供坚实的数据支撑。
二.测试环境创建
您可以参照以下步骤创建并测试 Aurora 旧集群
- 创建一个 5.7.mysql_aurora.2.12.4 版本的 Aurora MySQL 的集群
- 创建一个系统为 HAQM linux 2 的 EC2,并且在 Aurora 集群中设置 EC2 conection
- 在集群中创建测试 Database 和 Table
CREATE DATABASE sales_database;
CREATE TABLE transactions (
Transaction_ID INT AUTO_INCREMENT PRIMARY KEY,
TS DATETIME,
Customer_ID INT,
Product_Category VARCHAR(50),
Quantity INT,
Price_per_Unit DECIMAL(10, 2),
Total_Amount DECIMAL(10, 2)
);
- 创建一个 Sample 业务代码持续往 Aurora 中写入数据
import mysql.connector
import random
import datetime
import time
# Connect to the MySQL database
db = mysql.connector.connect(
host="localhost",
user="your_username",
password="your_password",
database="sales_database"
)
# Create a cursor object
cursor = db.cursor(buffered=True) # Set buffered=True to allow accessing the last inserted row
# List of product categories
product_categories = ["Electronics", "clothing", "Books", "Furniture", "Toys"]
# Continuously insert random records
while True:
# Generate random values
customer_id = random.randint(1, 1000)
product_category = random.choice(product_categories)
quantity = random.randint(1, 10)
price_per_unit = round(random.uniform(10.0, 100.0), 2)
total_amount = quantity * price_per_unit
# Insert the record into the transactions table
insert_query = """
INSERT INTO transactions (TS, Customer_ID, Product_Category, Quantity, Price_per_Unit, Total_Amount)
VALUES (%s, %s, %s, %s, %s, %s)
"""
values = (datetime.datetime.now(), customer_id, product_category, quantity, price_per_unit, total_amount)
cursor.execute(insert_query, values)
db.commit() # Commit the changes immediately
# Get the last inserted record
last_record_id = cursor.lastrowid
cursor.execute("SELECT * FROM transactions WHERE Transaction_ID = %s", (last_record_id,))
last_record = cursor.fetchone()
print("Last inserted record:")
print(last_record)
# Wait for 1 second before inserting the next record
time.sleep(1)
# Close the connection when the script is terminated
cursor.close()
db.close()
- 运行 Sample 代码并且验证数据写入
- 当以上步骤完成后,验证数据已经写入测试 Aurora 集群
三.零停机升级方法详述
我们的迁移方案分为以下九个关键步骤:
- 检查并开 binlog->备份源实例>恢复新实例,准备新集群,可以基于旧集群的快照进行恢复,并将新集群升级到目标版本
- 建立逻辑复制(可选),在新旧集群之间进行逻辑同步,减少数据缺失
- 应用代码改造(不上线),修改代码实现双写并忽略新集群的所有异常
- 在不影响当前业务逻辑的前提下,增加一个额外操作:所有数据库操作发送一份到新集群,忽略并记录所有异常避免影响业务
- 如果有自增主键:可以将旧库的 insert 语句返回的 id 作为新集群写入的 parameter;以避免主键冲突
- 停止新旧集群之间逻辑复制(如果有)
- 上线应用,开始双写
- 数据追平和修复,使用工具进行数据校验和修复
- 进行数据一致性校验和确认,确保新旧集群数据一致,业务系统无新增的新集群操作异常的记录。
- 应用改造及上线;将主业务逻辑切换到新集群;监控新集群运行情况,确保业务平稳运行
- 下线旧集群
接下来我们将逐个步骤进行介绍。
步骤一:准备新集群
首先,我们需要在 AWS 上创建一个新的数据库集群。
创建好新的数据库集群之后检查并打开 binlog 以后期进行逻辑复制。
确保 parameter group 中的 binlog_format 不是 OFF,如果是 OFF 可以将值改成 ROW。
接着配置 ‘binlog retention hours’ 配置参数以及在数据库集群上保留二进制日志文件的小时数, 示例将二进制日志文件的保留期设置为 6 天。
如果未指定此设置,则 Aurora MySQL 的默认值为 24 小时(1 天)。
CALL mysql.rds_show_configuration;
CALL mysql.rds_set_configuration('binlog retention hours', 144);
接下来基于该旧集群打快照。
使用旧集群快照在 HAQM RDS 或 HAQM Aurora 中恢复一个同一版本 5.7 版本的新实例。
当新实例创建完成之后,并将其升级到目标版本 8.0 版本。
连接到新集群,并且可以查看到新集群目前的 table 里面的数据更新停止在第 718 行。
步骤二:建立逻辑复制(可选)
为了减少后续的数据补全工作量,您可以考虑在新旧集群之间建立逻辑复制。AWS 数据库服务支持多种复制方案,如基于 binlog 的复制、CDC(增量数据捕获)等。通过复制,可以最大程度地同步新旧集群的数据。本文中将配置新旧集群的 binlog replication 来进行数据同步。
接着在源数据库创建一个用于复制的用户。
CREATE USER 'repl_user'@'IP_address' IDENTIFIED BY 'password';
GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl_user'@'IP_address';
GRANT USAGE ON *.* TO 'repl_user'@'IP_address' REQUIRE SSL;
从 Aurora events 和 log 中找到 snapshot 的 binlog 所在的 position。
有了这些数据之后,连接到新集群,并且建立 binlog replication。
CALL mysql.rds_set_external_source (
host_name
, host_port
, replication_user_name
, replication_user_password
, mysql_binary_log_file_name
, mysql_binary_log_file_location
, ssl_encryption
);
CALL mysql.rds_start_replication;
经过一段 replication,查看新集群数据可以看到数据已经同步到第 993 行。
步骤三:应用代码改造(不上线)
在这一步,我们需要修改应用程序代码,实现“系统双写”的功能。具体做法是:
- 在不影响当前业务逻辑的前提下,增加一个额外操作:所有数据库写操作,都需要发送一份到新集群,忽略并记录所有新集群的异常。
- 如果有自增主键,可以将旧库的 insert 语句返回的 id 作为新集群写入的参数,以避免主键冲突。
注意,此时的代码改动还不能上线,只是在本地或测试环境进行。
修改后的 Sample 代码可以参照以下代码:
import mysql.connector
import random
import datetime
import time
# Connect to the old MySQL database
old_db = mysql.connector.connect(
host="localhost",
user="your_username",
password="your_password",
database="old_sales_database"
)
# Connect to the new MySQL database
new_db = mysql.connector.connect(
host="localhost",
user="your_username",
password="your_password",
database="new_sales_database"
)
# Create cursor objects for both databases
old_cursor = old_db.cursor(buffered=True)
new_cursor = new_db.cursor(buffered=True)
# List of product categories
product_categories = ["Electronics", "clothing", "Books", "Furniture", "Toys"]
# Continuously insert random records
while True:
# Generate random values
customer_id = random.randint(1, 1000)
product_category = random.choice(product_categories)
quantity = random.randint(1, 10)
price_per_unit = round(random.uniform(10.0, 100.0), 2)
total_amount = quantity * price_per_unit
# Insert the record into the transactions table of the old database
insert_query = """
INSERT INTO transactions (TS, Customer_ID, Product_Category, Quantity, Price_per_Unit, Total_Amount)
VALUES (%s, %s, %s, %s, %s, %s)
"""
values = (datetime.datetime.now(), customer_id, product_category, quantity, price_per_unit, total_amount)
old_cursor.execute(insert_query, values)
old_db.commit()
# Get the last inserted record ID from the old database
old_last_record_id = old_cursor.lastrowid
# Insert the record into the transactions table of the new database with the same ID
insert_query = """
INSERT INTO transactions (Transaction_ID, TS, Customer_ID, Product_Category, Quantity, Price_per_Unit, Total_Amount)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
values = (old_last_record_id, datetime.datetime.now(), customer_id, product_category, quantity, price_per_unit, total_amount)
# Get the last inserted record from the old database
old_cursor.execute("SELECT * FROM transactions WHERE Transaction_ID = %s", (old_last_record_id,))
old_last_record = old_cursor.fetchone()
print("Last inserted record in old database:")
print(old_last_record)
try:
new_cursor.execute(insert_query, values)
new_db.commit()
# Get the last inserted record from the new database
new_cursor.execute("SELECT * FROM transactions WHERE Transaction_ID = %s", (old_last_record_id,))
new_last_record = new_cursor.fetchone()
print("Last inserted record in new database:")
print(new_last_record)
except Exception as e:
print(f"Error writing to new database: {e}")
# Wait for 1 second before inserting the next record
time.sleep(1)
# Close the connections when the script is terminated
old_cursor.close()
old_db.close()
new_cursor.close()
new_db.close()
步骤四:停止新旧集群之间逻辑复制(如果有)
如果之前建立了逻辑复制,需要在这一步停止复制,确保后续的数据写入能够被完整捕获。
CALL mysql.rds_stop_replication;
步骤五:上线应用,开始双写
将改造后的应用程序代码上线,启用“系统双写”功能。从这一时间点开始,所有的数据库写操作都会同时作用于新旧集群。
从 log 中可以看到,开启双写后,同一个数据会被同时写到新旧两个 cluster 中去。
步骤六:数据追平和修复
经过一段时间的双写后,新旧集群的数据就会出现分歧。我们需要执行以下操作将数据追平:
- 补齐数据:基于旧集群的 binlog,从上线双写的时间点开始,重新执行所有的增删改操作,将数据同步到新集群。
- 根据应用记录的异常日志,对新集群的数据进行修复。
- 对于非异常数据,也需要进行检查和修复,确保数据的完整性和一致性。
为了测试数据补全工作,在文章中我们会删除一些数据并且将一些数据改成错误的数据。
DELETE FROM transactions WHERE transaction_id < 1255 AND transaction_id > 1240;
UPDATE transactions
SET product_category='ERROR'
WHERE transaction_id < 1235 AND transaction_id > 1227;
对比旧集群和新集群的出错的数据:
文章中使用开源工具 pt-table-sync 进行数据一致性检测和修复。
使用以下 command 来打印出来差异的行信息:
pt-table-sync --print --execute h=source_host,u=username,p=password,D=database_name,t=table_name h=target_host,u=username,p=password
验证准确无误之后,去掉 command 中的 –-print 执行数据修正:
pt-table-sync --execute h=source_host,u=username,p=password,D=database_name,t=table_name h=target_host,u=username,p=password
对比新集群修正后的数据,和旧集群数据一致。
步骤七:进行数据一致性校验和确认,确保新旧集群数据一致,业务系统无新增的新集群操作异常的记录
在执行完数据修复操作后,需要使用数据对比工具或自定义脚本,全面验证新旧集群数据的一致性。确保两个集群的数据完全一致,且新集群没有新的异常记录。
pt-table-sync --print --verbose --execute h=source_host,u=username,p=password,D=database_name,t=table_name h=target_host,u=username,p=password
步骤八:应用改造及上线
当新旧集群数据保持一致后,我们可以将应用程序的主业务逻辑切换到新集群。具体做法是:
- 修改应用代码,将所有数据库读写操作切换到新集群。
- 上线改造后的应用程序代码。
- 监控新集群的运行情况,确保业务平稳运行。
步骤九:下线旧集群
最后,在新系统运行稳定一段时间后,可以安全地将旧集群下线,释放相关资源。
四.总结
本文详细介绍了如何通过“系统双写”和“数据补全”技术,实现现代应用中数据库的无缝迁移,特别是从旧版本平滑升级到 AWS 上的新版本数据库。这一方案的核心目标是确保迁移过程中数据的完整性和业务的连续性,从而达到零停机的效果。数据库升级是一个复杂而敏感的过程,因为它不仅涉及数据结构的调整和版本功能的差异,还需要避免对业务系统的影响。
在这个方案中,“系统双写”技术通过改造应用逻辑,使所有数据库操作同时写入旧集群和新集群,确保两边数据保持实时同步,而“数据补全”技术则通过 binlog 等机制,在迁移后期补齐可能遗漏的数据,修复潜在的异常,进一步保障数据的一致性。此外,方案中还包括新集群的初始化准备、逻辑复制(可选)的设置、双写功能的上线、数据追平与校验、以及最终业务逻辑的切换和旧集群的下线等步骤,环环相扣、细致严谨。在实施过程中,通过忽略新集群的异常来避免对现有业务的干扰,同时利用精确的数据校验手段确保迁移的高质量和可靠性。
最终,这一迁移方案不仅能够实现业务系统无感知的平稳过渡,还为未来的数据库架构优化奠定了基础,为企业的长期发展提供了强有力的支撑。如果您对本文有任何意见和建议,请与我们分享!
五.参考资料
本篇作者