English 简体中文 繁體中文 한국 사람 日本語 Deutsch русский بالعربية TÜRKÇE português คนไทย french
查看: 3|回复: 0

在Python后端项目FastApi中使用MongoDB进行数据处理

[复制链接]
查看: 3|回复: 0

在Python后端项目FastApi中使用MongoDB进行数据处理

[复制链接]
查看: 3|回复: 0

244

主题

0

回帖

742

积分

高级会员

积分
742
Zelp8J

244

主题

0

回帖

742

积分

高级会员

积分
742
7 天前 | 显示全部楼层 |阅读模式
我在前面随笔《在SqlSugar的开发框架中增加对低代码EAV模型(实体-属性-值)的WebAPI实现支持》中介绍了对于EAV数据存储的时候,我们把变化字段的数据记录存储在MongoDB数据库里面,这样除了支持动态化字段变化外,也更好的支持对字段不同类型的查询处理,之前随笔介绍的是基于C#操作MongoDB数据库的处理,由于Python后端FastApi项目的设计初衷是可以平滑更换 SqlSugar项目的Web API的,因此会涉及到在Python项目中对MongoDB的相关操作。本篇随笔先对Python环境中操作MongoDB数据库进行相关的介绍。
1、Python环境中操作MongoDB数据库的准备

如果需要了解相关MongoDB数据库的相关信息和C#开发MongoDB的内容,可以参考我的随笔《MongoDB数据库内容》,里面包含我对这个主题的相关介绍。
具体可以进一步参考官网里面的介绍。
https://www.mongodb.com/zh-cn/docs/
https://www.mongodb.com/zh-cn/docs/drivers/csharp/current/
https://www.mongodb.com/zh-cn/docs/languages/python/
https://www.mongodb.com/zh-cn/docs/drivers/motor/
如果我们需要下载安装,可以根据不同的操作系统下载对应的社区版本安装文件即可。
https://www.mongodb.com/zh-cn/products/self-managed/community-edition
安装完成后,可以使用管理工具进行MongoDB数据库的连接和创建,如下所示可以根据不同的需要创建不同的集合。

关系型数据库和MongoDB的数据库,它们的相关概念,对比关系图如下所示。

我们安装MongoDB数据库后,它是驻留在Window的服务里面,创建服务并顺利启动成功后,然后就可以在系统的服务列表里查看到了,我们确认把它设置为自动启动的Windows服务即可。

由于我们希望使用异步来操作MongoDB数据库,推荐使用motor驱动来操作它。Motor是一个异步mongodb driver,支持异步读写mongodb。
具体使用我们可以参考官网的介绍:https://www.mongodb.com/zh-cn/docs/drivers/motor/
使用前,我们需要再我们FastAPI项目中安装MongoDB和Motor的依赖模块。就是pymonogo和motor
我的requirement.txt文件中包含下面两个

 我们如果没有初始化安装,通过pip进行安装即可。
pip install pymongo motor
使用它的基础代码如下所示。
import motor.motor_asyncioclient = motor.motor_asyncio.AsyncIOMotorClient()或者client = motor.motor_asyncio.AsyncIOMotorClient("mongodb://localhost:27017")或者client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://username:password@localhost:27017/dbname')
根据MongoDB官网的说明,MongoDB的适用场景如下:
1)网站实时数据:MongoDB非常适合实时的插入,更新与查询,并具备网站实时数据存储所需的复制及高度伸缩性。
2)数据缓存:由于性能很高,MongoDB也适合作为信息基础设施的缓存层。在系统重启之后,由MongoDB搭建的持久化缓存层可以避免下层的数据源过载。
3)大尺寸、低价值数据存储:使用传统的关系型数据库存储一些数据时可能会比较昂贵,在此之前,很多时候程序员往往会选择传统的文件进行存储。
4)高伸缩性场景:MongoDB非常适合由数十或数百台服务器组成的数据库。MongoDB的路线图中已经包含对MapReduce引擎的内置支持。
5)对象或JSON数据存储:MongoDB的BSON数据格式非常适合文档化格式的存储及查询。
MongoDB数据库支持常规的增删改查等操作,其中它的 find方法很强大,可以组合很多条件查询的方式,如下所示:
db.collection.find({ "key" : value })    查找key=value的数据db.collection.find({ "key" : { $gt: value } })    key > valuedb.collection.find({ "key" : { $lt: value } })    key < valuedb.collection.find({ "key" : { $gte: value } })    key >= valuedb.collection.find({ "key" : { $lte: value } })    key <= valuedb.collection.find({ "key" : { $gt: value1 , $lt: value2 } })    value1 < key <value2db.collection.find({ "key" : { $ne: value } })    key <> valuedb.collection.find({ "key" : { $mod : [ 10 , 1 ] } })    取模运算,条件相当于key % 10 == 1 即key除以10余数为1的db.collection.find({ "key" : { $nin: [ 1, 2, 3 ] } })    不属于,条件相当于key的值不属于[ 1, 2, 3 ]中任何一个db.collection.find({ "key" : { $in: [ 1, 2, 3 ] } })    属于,条件相当于key等于[ 1, 2, 3 ]中任何一个db.collection.find({ "key" : { $size: 1 } })    $size 数量、尺寸,条件相当于key的值的数量是1(key必须是数组,一个值的情况不能算是数量为1的数组)db.collection.find({ "key" : { $exists : true|false } })    $exists 字段存在,true返回存在字段key的数据,false返回不存在字度key的数据db.collection.find({ "key": /^val.*val$/i })    正则,类似like;“i”忽略大小写,“m”支持多行db.collection.find({ $or : [{a : 1}, {b : 2} ] })    $or或 (注意:MongoDB 1.5.3后版本可用),符合条件a=1的或者符合条件b=2的数据都会查询出来db.collection.find({ "key": value , $or : [{ a : 1 } , { b : 2 }] })    符合条件key=value ,同时符合其他两个条件中任意一个的数据db.collection.find({ "key.subkey" :value })    内嵌对象中的值匹配,注意:"key.subkey"必须加引号db.collection.find({ "key": { $not : /^val.*val$/i } })    这是一个与其他查询条件组合使用的操作符,不会单独使用。上述查询条件得到的结果集加上$not之后就能获得相反的集合。
比较符号说明如下:
符  号含  义示  例$lt小于{'age': {'$lt': 20}}$gt大于{'age': {'$gt': 20}}$lte小于等于{'age': {'$lte': 20}}$gte大于等于{'age': {'$gte': 20}}$ne不等于{'age': {'$ne': 20}}$in在范围内{'age': {'$in': [20, 23]}}$nin不在范围内{'age': {'$nin': [20, 23]}}
另外,还可以进行正则匹配查询。例如,查询名字以 M 开头的学生数据,示例如下:
results = collection.find({'name': {'$regex': '^M.*'}})
这里使用 $regex 来指定正则匹配,^M.* 代表以 M 开头的正则表达式。
多条件查询  $and $or
# and查询db.collection.find({         $and :  [                { "age" :  {$gt : 10 }} ,                { "gender" :  "man" }          ]})#or查询db.collection.find({          $or : [                    {"age" :  {$gt : 10 }},                    { "gender" :  "man"}         ]})#and查询 和 or查询db.inventory.find( {    $and : [        { $or : [ { price : 0.99 }, { price : 1.99 } ] },        { $or : [ { sale : true }, { qty : { $lt : 20 } } ] }    ]} )
关于这些操作的更详细用法,可以在 MongoDB 官方文档找到: https://docs.mongodb.com/manual/reference/operator/query/
当然还有插入更新的处理语句也是很特别的。
db.student.insert({name:'student1',subject:['arts','music']})
特别是更新操作需要说明一下,支持常规的$set方法(修改)、$unset方法(移除指定的键),还有原子级的$inc方法(数值增减),$rename方法(重命名字段名称)等等,
db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")},  {"$set" : {"hobby" :["swimming","basketball"]}} )db.users.update({"_id" : ObjectId("51826852c75fdd1d8b805801")},{"$unset" : {"hobby" :1 }} )db.posts.update({"_id" : ObjectId("5180f1a991c22a72028238e4")}, {"$inc":{"pageviews":1}})db.students.update( { _id: 1 }, { $rename: { 'nickname': 'alias', 'cell': 'mobile' } }
upsert是一种特殊的更新操作,不是一个操作符。(upsert = up[date]+[in]sert),也就是如果存在则更新,否则就写入一条新的记录操作。这个参数是个布尔类型,默认是false。
db.users.update({age :25}, {$inc :{"age" :3}}, true)
另外,Update可以对Json的集合进行处理,如果对于subject对象是一个集合的话,插入或更新其中的字段使用下面的语句
db.student.update({name:'student5'},{$set:{subject:['music']}},{upsert:true});
如果是记录已经存在,我们可以使用索引数值进行更新其中集合里面的数据,如下所示。
db.student.update({name:'student3'},{$set:{'subject.0':'arts'}});
 
2、在FastAPI项目中使用pymongo 和motor 操作MongoDB

上面我们大致介绍了一些基础的MongoDB数据库信息。
之前随笔《Python 开发环境的准备以及一些常用类库模块的安装》介绍过我们的FastAPI配置信息,存储在.env文件中的,在FastAPI项目启动的时候,根据需要进行读取加载到对象里面。
如对于配置信息的处理,我们还可以引入 python-dotenv 和  pydantic_settings 来统一管理配置参数。我们的项目.env文件部分配置如下。

 最后我们通过pydantic_settings的配置处理,获得相关的配置对象信息,如下。

其中的MONGO_URL 就是我们具体使用的MogoDB数据库连接信息了。
为了和数据库连接一样方便使用MongoDB的连接,我们可以通过在一个独立的文件中声明获得MongoDB的数据库和集合对象,并且通过缓存的方式提高使用效率。
from motor.motor_asyncio import AsyncIOMotorClientfrom motor.motor_asyncio import AsyncIOMotorCollectionfrom functools import lru_cachefrom core.config import settingsclass MongoClientManager:    def __init__(self, uri: str = settings.MONGO_URL):        """初始化 MongoDB 客户端管理器        Args:            uri = mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB_NAME}"""        self._client = AsyncIOMotorClient(uri)        self._db = self._client.get_default_database()  # 使用 URI 中指定的 dbname    @property    def db(self):        return self._db    def close(self):        self._client.close()# 创建全局 Mongo 管理器实例@lru_cache()def get_mongo_client() -> MongoClientManager:    return MongoClientManager()@lru_cache()def get_collection(entity_model: str) -> AsyncIOMotorCollection:    """    获取指定名称的集合(collection),类型是 BsonDocument(等价于 Python dict)    MongoDB 的集合通常在首次写入数据时自动创建,因此你只需要获取对应集合即可,不需要手动“创建”    """    db = get_mongo_client().db    return db[entity_model]
这样,我们项目启动的时候,自动加载配置文件,并在这里初始化MongoDBClient的异步对象和集合缓存对象。
有了这些,我们为了方便,还需要对MongoDB数据库的操作进行一些的封装处理,以提高我们对接口的使用遍历,毕竟我们前面介绍到了MongoDB支持非常复杂的查询和处理,我们往往只需要一些特殊的接口即可,因此封装接口有利于我们对接口的使用便利性。
我们在项目的utils目录中增加一个辅助类mongo_helper.py,用来封装MongoDB的相关操作的。
我们截取部分代码,如下所示。
class MongoAsyncHelper:    """基于 motor 实现MongoDb异步操作,支持:    insert_one / insert_many    find_one / find_many    update_one / update_many    delete_one / delete_many    复杂条件、分页、排序、模糊查询等"""    def __init__(self, collection: AsyncIOMotorCollection):        self.collection = collection    async def insert_one(self, data: Dict[str, Any]):        """插入单条数据        examples:            helper = MongoAsyncHelper(collection)            await helper.insert_one({                "name": "John",                "age": 25,                "created_at": datetime.utcnow()}            )        args:            data: 待插入的数据        """        return await self.collection.insert_one(data)    async def insert_many(self, data_list: List[Dict[str, Any]]):        """插入多条数据        examples:            helper = MongoAsyncHelper(collection)            await helper.insert_many([                {"name": "John", "age": 25, "created_at": datetime.utcnow()},                {"name": "Mary", "age": 30, "created_at": datetime.utcnow()}            ])        args:            data_list: 待插入的数据列表        """        return await self.collection.insert_many(data_list)    async def find_by_id(self, id: str) -> Optional[Dict[str, Any]]:        """根据 _id 查询单条数据"""        return await self.find_one({"_id": id})    async def update_by_id(self, id: str, update_data: Dict[str, Any]):        """根据 _id 更新单条数据"""        return await self.update_one({"_id": id}, {"$set": update_data})    async def delete_by_id(self, id: str):        """根据 _id 删除单条数据"""        return await self.delete_one({"_id": id})      ...............
例如,我们在前面介绍的EAV处理中,获取MongoDB数据库的指定实体类型(对应表)的所有集合处理,在Python的数据处理层代码中如下所示。
    async def mongo_get_all(        self,        db: AsyncSession,        entitytype_id: str,        parentid: str = None,        sorting: str = None,    ) -> List[dict]:        """获取实体类型的所有记录        :param db: 数据库会话        :param entitytype_id: 实体类型ID        :param parentid: 父ID        :param sorting: 排序条件,格式:name asc 或 name asc,age desc        """        items = []        try:            entityType: EntityType = await self.__get_entity_type(db, entitytype_id)            self.logger.info(f"获取实体类型:{entitytype_id}, {entityType.name}")            if not entityType:                return items            # 连接 MongoDB            collection = get_collection(entityType.entitymodel)            mongo_helper = MongoAsyncHelper(collection)            # 构建查询条件            filter = {}            if parentid is not None:                filter["ParentId"] = parentid            sort_by = parse_sort_string(sorting)            # print(sort_by)            items = await mongo_helper.find_all(filter, sort_by)            return items        except Exception as e:            self.logger.error(f"处理失败:{str(e)}")            return items
首先通过缓存接口get_collection获得对应实体类型的集合,然后传递集合到辅助类 MongoAsyncHelper后构建MongoDB辅助类,就可以利用其接口进行相关的MongoDB数据库操作处理了。
最后通过类似的处理,结合数据库操作和MongoDB数据库操作,我们把EAV的接口服务迁移到了Python中FastAPI项目中了。

最后完成的FastAPI项目的EAV相关接口如下所示。

折叠相关模块显示如下所示。

 最后切换到Winform项目上,调整接入的Web API数据源,同样获得一样的界面效果即可。
产品数据表。

订单数据表

 
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

244

主题

0

回帖

742

积分

高级会员

积分
742

QQ|智能设备 | 粤ICP备2024353841号-1

GMT+8, 2025-5-1 16:21 , Processed in 1.519659 second(s), 24 queries .

Powered by 智能设备

©2025