应对高并发的业务场景,用LiteDB做C#应用的内存表

时间:10/03/2024 17:28:25   作者:ChenReal    阅读:31

应用场景

我们的开发的项目中,可能会遇到这样的业务场景:某些数据表需要高并发地读写。例如:WMS系统的库存表,需要频繁读写库存数据;会员系统中会员的消费余额,也同样有这个需要。

面对这样的场景,如果我们仅是设计一个关系数据库表,然后常规的方法完全依赖数据库来读写数据。在高并发的场景下,磁盘的I/O性能会显著降低,数据的一致性也很难保证。这时候,我们似乎需要做点什么优化。

很多人想到是用缓存,是的,缓存是一个很好解决办法。我今天介绍一下我惯用的方案——LiteDB内存表(Memory Table)。从本质上来说内存表,也是缓存的一种。我认为用LiteDB在大规模数据操作上会更加方便和优雅。

为什么是LiteDB?

LiteDB是一款基于.NET平台的NoSQL文档存储数据库,以其轻量级和高效性著称。它支持无服务器架构,提供类似于MongoDB的简单API,并且完全由C#编写,适用于.NET 4.5及更高版本。LiteDB的核心优势在于其单一数据文件存储模式,类似于SQLite,同时支持数据文件加密和完整的事务支持,确保数据的安全性和一致性。

上面提到的SQLite,其实也是一个不错的选项。考虑到我们做的是.NET应用,而LiteDB是血统纯正的.NET数据库,两者的搭配的非常和谐完美。

项目实例

基于内存表设计的架构模型:

image.png

1、从Nuget将LiteDB引入项目工程

image.png

2、关系数据创建库存表

建表SQL语句:

CREATE TABLE "public"."stock_info" (
  "id" int4 NOT NULL GENERATED BY DEFAULT AS IDENTITY,
  "sku_no" varchar(32) COLLATE "pg_catalog"."default" NOT NULL,
  "stock_qty" int4 NOT NULL,
  "warehouse_code" varchar(32) COLLATE "pg_catalog"."default" NOT NULL
)
3、定义POCO类

关系数据库的POCO类:

public class StockInfo  
{  
    public int Id { get; set; }  
    public string SkuNo { get; set; } 
    public int StockQty { get; set; }
    public string WarehouseCode { get; set; } 
}

内存表的POCO类(继承了上面的StockInfo

public class CacheStock : StockInfo  
{  
    public int OldId { get; set; }  // Copy from StockInfo.Id
    public bool IsModified { get; set; }
}
4、封装数据操作服务类
internal class StockInfoService  
{
    private readonly ILiteCollection<CacheStock> _table;  
    private readonly ILiteDatabase _memDB;  
    const string TABLE_NAME = "litedb_stock_info";  
    private readonly StockInfoDao _dao;  // 关系数据库的 Data Access Object
    public StockInfoService(StockInfoDao dao)  
    {  
        _dao = dao;
        _memDB = new LiteDatabase(new MemoryStream()); 

        // 获取内存表对象并定义索引
        _table = _memDB.GetCollection<CacheStock>(TABLE_NAME);  
        _table.EnsureIndex(x => x.Id, true);  
        _table.EnsureIndex(x => x.OldId);
        _table.EnsureIndex(x => x.SkuNo);  
        _table.EnsureIndex(x => x.WarehouseCode); 

    // 初始化
    InitData();
    }  

    public ILiteCollection<CacheStock> Table => _table;  

    // 初始化数据(数据写入内存表)
    private void InitData(){
        var total = dao.Count();  
        const int size = 2000;  
        int n = 0;  
        while (total > size * n)  
        {  
            var stockList = dao.Query().Take(size).Skip(n * size).ToList<CacheStock>();  
            stockList.ForEach(x => { x.StockId = x.Id; });  
            _table.InsertBulk(stockList);  
            n++;  
        }
    }

    // 库存查询
    public List<CacheStock> GetStockList(string warehouse,string sku,int page=0,int pageSize = 0)  
    {  
        var query = _table.Query();  
        if (!string.IsNullOrEmpty(warehouse)) query.Where(x => x.WarehouseCode == warehouse);
        if (!string.IsNullOrEmpty(sku)) query.Where(x => x.SkuNo == sku);
        if (pageSize>0){
            if(page<=0) page = 1;
            query.Skip((page - 1) * pageSize).Limit(pageSize); 
        }
        return query.ToList();  
    }

    public List<CacheStock> GetStock(string warehouse,string sku){
        return _table.FindOne(x => x.SkuNo == sku && x.WarehouseCode == warehouse);
    }

    private bool Increase(CacheStock stock, int qty)  
    {
        stock.StockQty += qty;  
        stock.IsModified = true;   
        return _table.Upsert(stock);  
    }  

    // 库存增加
    public bool Increase(string sku, string warehouse,int qty)  
    {  
        var stock = _GetStockForUpdate(sku, warehouse);  
        return Increase(stock, qty);  
    }  

    private bool Decrease(CacheStock stock, int qty)  
    {  
        stock.StockQty -= qty;  
        stock.IsModified = true;   
        return _table.Upsert(stock);   
    }  

    // 库存减少
    public bool Decrease(string sku, string warehouse,int qty)  
    {  
        var stock = _GetStockForUpdate(sku, warehouse);  
        return Decrease(stock, qty);  
    }  

    private CacheStock _GetStockForUpdate(string sku, string warehouse)  
    {  
        var stock = _table.FindOne(x => x.SkuNo == sku && x.WarehouseCode == warehouse);  
        stock ??= new CacheStock()  
        {  
            Id = 0,  
            SkuNo = sku,  
            WarehouseCode = warehouse,  
            Batch = batch 
        };  
        stock.IsModified = true;  
        return stock;  
    }  

    // 数据写入关系数据库
    public boot Flush()  
    {
        if (!HasDirty()) return false;   
        var all = _table.Find(x => x.IsModified == true).ToList();  
        _table.UpdateMany(x => new CacheStock() { IsModified = false }, x => x.IsModified == true);  
        var addList = all.FindAll(x => x.OldId == 0);  
        var insertList = addList.ConvertAll<StockInfo>(item => item);
        var updateList = all.FindAll(x => x.OldId > 0);  
        foreach (var stock in updateList)  
        {  
            _dao.Update()  
                .Set(x => x.StockQty, stock.StockQty)  
                .Where(x => x.Id == stock.OldId)  
                .ExecuteAffrows() > 0;  
        }  
        for (int i = 0; i < addList.Count; i++)  
        {  
            var stock = insertList[i]; 
            var id = (int)_dao.CreateWithIdentity(stock);  
            if (id > 0) addList[i].OldId = id;  
        }   
        if(addList.Any())_table.Update(addList);  
        return true;  
    }  

    public bool HasDirty() => _table.Exists(x => x.IsModified == true);  
}
5、定时作业

最后一步,我们要做的就是用定时作业,讲内存表变更的数据同步写入关系数据,进行数据的持久化。实际上只需要调用方法StockInfoService.Flush即可。这里的代码就留给大家自己去实现吧!

 

评论
0/200