查看原文
其他

Sanic教程: 5.数据库使用

howie6879 老胡的储物柜 2022-06-01

介绍中说的很明白, Sanic 是一个可以使用 async/await 语法编写项目的异步非阻塞框架,既然是异步框架,那么在使用过程中用到的第三方包也最好是异步的,比如http请求,最好就使用 aihttp而非 requests,对于数据库的连接,也是同样如此,下面我将用代码的形式来说明下如何在Sanic中连接数据库。

操作Mysql

对于mysql数据库的异步操作,我只在一些脚本中用过,用的是aiomysql,其中官方文档中讲得很清楚,也支持结合 sqlalchemy编写 ORM,然后aiomysql提供了自己编写的异步引擎。

  1. from aiomysql.sa import create_engine

  2. # 这个才是关键

下面我编写一个具体的例子来用异步语句操作下数据库,首先建立如下目录:

  1. aio_mysql

  2. ├── demo.py

  3. ├── model.py

  4. └── requirements.txt

建立表:

  1. create database test_mysql;


  2. CREATE TABLE user

  3. (

  4.  id        INT AUTO_INCREMENT

  5.    PRIMARY KEY,

  6.  user_name VARCHAR(16) NOT NULL,

  7.  pwd       VARCHAR(32) NOT NULL,

  8.  real_name VARCHAR(6)  NOT NULL

  9. );

一切准备就绪,下面编写代码:

  1. # script: model.py

  2. import sqlalchemy as sa


  3. metadata = sa.MetaData()


  4. user = sa.Table(

  5.    'user',

  6.    metadata,

  7.    sa.Column('id', sa.Integer, autoincrement=True, primary_key=True),

  8.    sa.Column('user_name', sa.String(16), nullable=False),

  9.    sa.Column('pwd', sa.String(32), nullable=False),

  10.    sa.Column('real_name', sa.String(6), nullable=False),

  11. )


  12. # script: demo.py

  13. import asyncio


  14. from aiomysql.sa import create_engine


  15. from model import user,metadata



  16. async def go(loop):

  17.    """

  18.    aiomysql项目地址:https://github.com/aio-libs/aiomysql

  19.    :param loop:

  20.    :return:

  21.    """

  22.    engine = await create_engine(user='root', db='test_mysql',

  23.                                 host='127.0.0.1', password='123456', loop=loop)

  24.    async with engine.acquire() as conn:

  25.        await conn.execute(user.insert().values(user_name='user_name01', pwd='123456', real_name='real_name01'))

  26.        await conn.execute('commit')


  27.        async for row in conn.execute(user.select()):

  28.            print(row.user_name, row.pwd)


  29.    engine.close()

  30.    await engine.wait_closed()



  31. loop = asyncio.get_event_loop()

  32. loop.run_until_complete(go(loop))

运行 python demo.py,会看到如下输出:

  1. user_name01 123456

很简单吧,具体示例见aio_mysql,如果你比较喜欢类似SQLAlchemy的操作方式,这里推荐一个异步ORM,gino。

操作MongoDB

我业余写的一个项目,基本用的就是 MongoDB来储存数据,对于异步操作 MongoDB,目前Python主要用的是motor,使用起来依旧很简单,但是结合具体功能,就有不同的需求,最后就会形成各种各样的连接方案,这里我主要分享下自己是如何使用的,目录如下所示:

  1. aio_mongo

  2. ├── demo.py

  3. └── requirements.txt

MongoDB是一个基于分布式文件存储的数据库,它介于关系数据库和非关系数据库之间,所以它使用起来也是比较灵活的,打开 demo.py

  1. #!/usr/bin/env python

  2. import os


  3. from functools import wraps


  4. from motor.motor_asyncio import AsyncIOMotorClient


  5. MONGODB = dict(

  6.    MONGO_HOST=os.getenv('MONGO_HOST', ""),

  7.    MONGO_PORT=os.getenv('MONGO_PORT', 27017),

  8.    MONGO_USERNAME=os.getenv('MONGO_USERNAME', ""),

  9.    MONGO_PASSWORD=os.getenv('MONGO_PASSWORD', ""),

  10.    DATABASE='test_mongodb',

  11. )


  12. class MotorBaseOld:

  13.    """

  14.    默认实现了一个db只创建一次,缺点是更换集合麻烦

  15.    """

  16.    _db = None

  17.    MONGODB = MONGODB


  18.    def client(self, db):

  19.        # motor

  20.        self.motor_uri = 'mongodb://{account}{host}:{port}/{database}'.format(

  21.            account='{username}:{password}@'.format(

  22.                username=self.MONGODB['MONGO_USERNAME'],

  23.                password=self.MONGODB['MONGO_PASSWORD']) if self.MONGODB['MONGO_USERNAME'] else '',

  24.            host=self.MONGODB['MONGO_HOST'] if self.MONGODB['MONGO_HOST'] else 'localhost',

  25.            port=self.MONGODB['MONGO_PORT'] if self.MONGODB['MONGO_PORT'] else 27017,

  26.            database=db)

  27.        return AsyncIOMotorClient(self.motor_uri)


  28.    @property

  29.    def db(self):

  30.        if self._db is None:

  31.            self._db = self.client(self.MONGODB['DATABASE'])[self.MONGODB['DATABASE']]


  32.        return self._db

我最开始,使用的是这种方式来连接 MongoDB,上面代码保证了集合中的db被_db维护,保证只会创建一次,如果你项目中不会随意更改集合的话,也没什么大问题,如果不是,我推荐使用下面这样的连接方式,可以自由地更换集合与db:

  1. def singleton(cls):

  2.    """

  3.    用装饰器实现的实例 不明白装饰器可见附录 装饰器:https://github.com/howie6879/Sanic-For-Pythoneer/blob/master/docs/part2/%E9%99%84%E5%BD%95%EF%BC%9A%E5%85%B3%E4%BA%8E%E8%A3%85%E9%A5%B0%E5%99%A8.md

  4.    :param cls: cls

  5.    :return: instance

  6.    """

  7.    _instances = {}


  8.    @wraps(cls)

  9.    def instance(*args, **kw):

  10.        if cls not in _instances:

  11.            _instances[cls] = cls(*args, **kw)

  12.        return _instances[cls]


  13.    return instance


  14. @singleton

  15. class MotorBase:

  16.    """

  17.    更改mongodb连接方式 单例模式下支持多库操作

  18.    About motor's doc: https://github.com/mongodb/motor

  19.    """

  20.    _db = {}

  21.    _collection = {}

  22.    MONGODB = MONGODB


  23.    def __init__(self):

  24.        self.motor_uri = ''


  25.    def client(self, db):

  26.        # motor

  27.        self.motor_uri = 'mongodb://{account}{host}:{port}/{database}'.format(

  28.            account='{username}:{password}@'.format(

  29.                username=self.MONGODB['MONGO_USERNAME'],

  30.                password=self.MONGODB['MONGO_PASSWORD']) if self.MONGODB['MONGO_USERNAME'] else '',

  31.            host=self.MONGODB['MONGO_HOST'] if self.MONGODB['MONGO_HOST'] else 'localhost',

  32.            port=self.MONGODB['MONGO_PORT'] if self.MONGODB['MONGO_PORT'] else 27017,

  33.            database=db)

  34.        return AsyncIOMotorClient(self.motor_uri)


  35.    def get_db(self, db=MONGODB['DATABASE']):

  36.        """

  37.        获取一个db实例

  38.        :param db: database name

  39.        :return: the motor db instance

  40.        """

  41.        if db not in self._db:

  42.            self._db[db] = self.client(db)[db]


  43.        return self._db[db]


  44.    def get_collection(self, db_name, collection):

  45.        """

  46.        获取一个集合实例

  47.        :param db_name: database name

  48.        :param collection: collection name

  49.        :return: the motor collection instance

  50.        """

  51.        collection_key = db_name + collection

  52.        if collection_key not in self._collection:

  53.            self._collection[collection_key] = self.get_db(db_name)[collection]


  54.        return self._collection[collection_key]

为了避免重复创建MotorBase实例,可以实现一个单例模式来保证资源的有效利用,具体代码以及运行demo见aio_mongo

操作Redis

对于Redis的异步操作,我选用的是 asyncio_redis,你大可不必非要使用这个,或许其他的库实现地更好,我只是用这个举个例子,建立如下目录:

  1. aio_redis

  2. ├── demo.py

  3. └── requirements.txt

建立一个redis连接池:

  1. #!/usr/bin/env python

  2. import os

  3. import asyncio_redis


  4. REDIS_DICT = dict(

  5.    IS_CACHE=True,

  6.    REDIS_ENDPOINT=os.getenv('REDIS_ENDPOINT', "localhost"),

  7.    REDIS_PORT=os.getenv('REDIS_PORT', 6379),

  8.    REDIS_PASSWORD=os.getenv('REDIS_PASSWORD', None),

  9.    DB=0,

  10.    POOLSIZE=10,

  11. )



  12. class RedisSession:

  13.    """

  14.    建立redis连接池

  15.    """

  16.    _pool = None


  17.    async def get_redis_pool(self):

  18.        if not self._pool:

  19.            self._pool = await asyncio_redis.Pool.create(

  20.                host=str(REDIS_DICT.get('REDIS_ENDPOINT', "localhost")), port=int(REDIS_DICT.get('REDIS_PORT', 6379)),

  21.                poolsize=int(REDIS_DICT.get('POOLSIZE', 10)), password=REDIS_DICT.get('REDIS_PASSWORD', None),

  22.                db=REDIS_DICT.get('DB', None)

  23.            )


  24.        return self._pool

具体见aio_redis,使用起来很简单,不做多叙述。

说明

如果你使用其它类型的数据库,其实使用方式也是类似。 本章代码地址,见demo05


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存