应用场景
我们的开发的项目中,可能会遇到这样的业务场景:某些数据表需要高并发地读写。例如:WMS系统的库存表,需要频繁读写库存数据;会员系统中会员的消费余额,也同样有这个需要。
面对这样的场景,如果我们仅是设计一个关系数据库表,然后常规的方法完全依赖数据库来读写数据。在高并发的场景下,磁盘的I/O性能会显著降低,数据的一致性也很难保证。这时候,我们似乎需要做点什么优化。
很多人想到是用缓存,是的,缓存是一个很好解决办法。我今天介绍一下我惯用的方案——LiteDB内存表(Memory Table)。从本质上来说内存表,也是缓存的一种。我认为用LiteDB在大规模数据操作上会更加方便和优雅。
为什么是LiteDB?
LiteDB是一款基于.NET平台的NoSQL文档存储数据库,以其轻量级和高效性著称。它支持无服务器架构,提供类似于MongoDB的简单API,并且完全由C#编写,适用于.NET 4.5及更高版本。LiteDB的核心优势在于其单一数据文件存储模式,类似于SQLite,同时支持数据文件加密和完整的事务支持,确保数据的安全性和一致性。
上面提到的SQLite,其实也是一个不错的选项。考虑到我们做的是.NET应用,而LiteDB是血统纯正的.NET数据库,两者的搭配的非常和谐完美。
项目实例
基于内存表设计的架构模型:
1、从Nuget将LiteDB引入项目工程
2、关系数据创建库存表
建表SQL语句:
CREATE TABLE "public"."stock_info" (
"id" int4 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
"sku_no" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
"stock_qty" int4 NOT NULL,
"warehouse_code" varchar(32) COLLATE "pg_catalog"."default" NOT NULL
)
3、定义POCO类
关系数据库的POCO类:
public class StockInfo
{
public int Id { get; set; }
public string SkuNo { get; set; }
public int StockQty { get; set; }
public string WarehouseCode { get; set; }
}
内存表的POCO类(继承了上面的StockInfo
)
public class CacheStock : StockInfo
{
public int OldId { get; set; } // Copy from StockInfo.Id
public bool IsModified { get; set; }
}
4、封装数据操作服务类
internal class StockInfoService
{
private readonly ILiteCollection<CacheStock> _table;
private readonly ILiteDatabase _memDB;
const string TABLE_NAME = "litedb_stock_info";
private readonly StockInfoDao _dao; // 关系数据库的 Data Access Object
public StockInfoService(StockInfoDao dao)
{
_dao = dao;
_memDB = new LiteDatabase(new MemoryStream());
// 获取内存表对象并定义索引
_table = _memDB.GetCollection<CacheStock>(TABLE_NAME);
_table.EnsureIndex(x => x.Id, true);
_table.EnsureIndex(x => x.OldId);
_table.EnsureIndex(x => x.SkuNo);
_table.EnsureIndex(x => x.WarehouseCode);
// 初始化
InitData();
}
public ILiteCollection<CacheStock> Table => _table;
// 初始化数据(数据写入内存表)
private void InitData(){
var total = dao.Count();
const int size = 2000;
int n = 0;
while (total > size * n)
{
var stockList = dao.Query().Take(size).Skip(n * size).ToList<CacheStock>();
stockList.ForEach(x => { x.StockId = x.Id; });
_table.InsertBulk(stockList);
n++;
}
}
// 库存查询
public List<CacheStock> GetStockList(string warehouse,string sku,int page=0,int pageSize = 0)
{
var query = _table.Query();
if (!string.IsNullOrEmpty(warehouse)) query.Where(x => x.WarehouseCode == warehouse);
if (!string.IsNullOrEmpty(sku)) query.Where(x => x.SkuNo == sku);
if (pageSize>0){
if(page<=0) page = 1;
query.Skip((page - 1) * pageSize).Limit(pageSize);
}
return query.ToList();
}
public List<CacheStock> GetStock(string warehouse,string sku){
return _table.FindOne(x => x.SkuNo == sku && x.WarehouseCode == warehouse);
}
private bool Increase(CacheStock stock, int qty)
{
stock.StockQty += qty;
stock.IsModified = true;
return _table.Upsert(stock);
}
// 库存增加
public bool Increase(string sku, string warehouse,int qty)
{
var stock = _GetStockForUpdate(sku, warehouse);
return Increase(stock, qty);
}
private bool Decrease(CacheStock stock, int qty)
{
stock.StockQty -= qty;
stock.IsModified = true;
return _table.Upsert(stock);
}
// 库存减少
public bool Decrease(string sku, string warehouse,int qty)
{
var stock = _GetStockForUpdate(sku, warehouse);
return Decrease(stock, qty);
}
private CacheStock _GetStockForUpdate(string sku, string warehouse)
{
var stock = _table.FindOne(x => x.SkuNo == sku && x.WarehouseCode == warehouse);
stock ??= new CacheStock()
{
Id = 0,
SkuNo = sku,
WarehouseCode = warehouse,
Batch = batch
};
stock.IsModified = true;
return stock;
}
// 数据写入关系数据库
public boot Flush()
{
if (!HasDirty()) return false;
var all = _table.Find(x => x.IsModified == true).ToList();
_table.UpdateMany(x => new CacheStock() { IsModified = false }, x => x.IsModified == true);
var addList = all.FindAll(x => x.OldId == 0);
var insertList = addList.ConvertAll<StockInfo>(item => item);
var updateList = all.FindAll(x => x.OldId > 0);
foreach (var stock in updateList)
{
_dao.Update()
.Set(x => x.StockQty, stock.StockQty)
.Where(x => x.Id == stock.OldId)
.ExecuteAffrows() > 0;
}
for (int i = 0; i < addList.Count; i++)
{
var stock = insertList[i];
var id = (int)_dao.CreateWithIdentity(stock);
if (id > 0) addList[i].OldId = id;
}
if(addList.Any())_table.Update(addList);
return true;
}
public bool HasDirty() => _table.Exists(x => x.IsModified == true);
}
5、定时作业
最后一步,我们要做的就是用定时作业,讲内存表变更的数据同步写入关系数据,进行数据的持久化。实际上只需要调用方法StockInfoService.Flush
即可。这里的代码就留给大家自己去实现吧!