遇到一个很有趣的问题 或者是 bug

来源:34-3 基于mysql的乐观锁机制实现

城中城

2022-06-28

from datetime import datetime
import threading
import time
from random import randint

from peewee import *
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
from inventory_srv.settings import settings


R = threading.Lock()


class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
    pass

db = ReconnectMySQLDatabase("mxshop_inventory_srv", host="192.168.57.129",  port=3306, user="root", password="56248123")


# 删除 物理删除和逻辑删除 - 物理删除 - 假设你把某个用户数据 - 用户购买记录, 用户的收藏记录, 用户浏览记录啊
# 通过 save 方法做了修改 如何确保只修改 update_time 值而不是修改 add_time
class BaseModel(Model):
    add_time = DateTimeField(default=datetime.now, verbose_name="添加时间")
    update_time = DateTimeField(default=datetime.now, verbose_name="更新时间")
    is_deleted = BooleanField(default=False, verbose_name="是否删除")

    def save(self, *args, **kwargs):
        # 判断这是一个新添加的数据还是更新的数据
        if self._pk is not None:
            # 这是一个新数据
            self.update_time = datetime.now()
        return super().save(*args, **kwargs)

    @classmethod
    def delete(cls, permanently=False):   # permanently 表示是否永久删除
        if permanently:
            return super().delete()
        else:
            return super().update(is_deleted=True)

    def delete_instance(self, permanently=False, recursive=False, delete_nullable=False):
        if permanently:
            return self.delete(permanently).where(self._pk_expr()).execute()
        else:
            self.is_deleted = True
            self.save()

    @classmethod
    def select(cls, *fields):
        return super().select(*fields).where(cls.is_deleted==False)

    class Meta:
        database = settings.DB


# class Stock(BaseModel):
#     # 仓库表
#     name = CharField(verbose_name="仓库名")
#     address = CharField(verbose_name="仓库地址")


class Inventory(BaseModel):
    # 商品的库存表
    # stock = PrimaryKeyField(Stock)
    goods = IntegerField(verbose_name="商品id", unique=True)      # unique:在此列上创建唯一索引
    stocks = IntegerField(verbose_name="库存数量", default=0)
    version = IntegerField(verbose_name="版本号", default=0)       # 分布式锁的一种 乐观锁


def sell():
    # 多线程下的并发带来的数据不一致的问题
    goods_list = [(421, 99), (422, 20), (423, 30)]
    with settings.DB.atomic() as txn:
        # 超卖
        for goods_id, num in goods_list:
            # 查询库存
            R.acquire()     # 获取锁 负载均衡
            goods_inv = Inventory.get(Inventory.goods==goods_id)
            print(f"商品{goods_id} 售出 {num} 件")
            import time
            from random import randint
            time.sleep(randint(1, 3))
            if goods_inv.stocks < num:
                print(f"商品: {goods_id} 库存不足")
                txn.rollback()
                R.release()  # 释放锁
                break
            else:
                # 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
                query = Inventory.update(stocks=Inventory.stocks-num).where(Inventory.goods==goods_id)
                ok = query.execute()
                if ok:
                    print("更新成功")
                else:
                    print("更新失败")
            R.release()     # 释放锁
            time.sleep(1)


def sell2():
    # 演示基于数据库的乐观锁机制
    goods_list = [(421, 10), (422, 20), (423, 30)]

    with settings.DB.atomic() as txn:
        # 超卖
        for goods_id, num in goods_list:
            # 查询库存
            goods_inv = Inventory.get(Inventory.goods == goods_id)
            print(f"当前的版本号:{goods_inv.version}")
            print(f"商品{goods_id} 售出 {num}件")

            time.sleep(randint(1, 3))
            if goods_inv.stocks < num:
                print(f"商品:{goods_id} 库存不足")
                txn.rollback()
            else:
                # 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
                # 我当时查询数据的时候版本号是goods_inv.version
                query = Inventory.update(stocks=Inventory.stocks - num, version=Inventory.version + 1).where(
                    Inventory.goods == goods_id, Inventory.version == goods_inv.version)
                ok = query.execute()
                if ok:
                    print("更新成功")
                    break
                else:
                    print("更新失败")


def sell3():
    # 演示基于数据库的乐观锁机制
    goods_list = [(421, 10), (422, 20), (423, 30)]
    while True:
        if text(goods_list):
            break

def text(goods_list):
    with settings.DB.atomic() as txn:
        # 超卖
        for goods_id, num in goods_list:
            # 查询库存
            goods_inv = Inventory.get(Inventory.goods == goods_id)
            print(f"当前的版本号:{goods_inv.version}")
            print(f"商品{goods_id} 售出 {num}件")

            time.sleep(randint(1, 3))
            if goods_inv.stocks < num:
                print(f"商品:{goods_id} 库存不足")
                txn.rollback()
                return True
            else:
                # 让数据库根据自己当前的值更新数据, 这个语句能不能处理并发的问题
                #我当时查询数据的时候版本号是goods_inv.version
                query = Inventory.update(stocks=Inventory.stocks - num, version=Inventory.version + 1).where(Inventory.goods == goods_id, Inventory.version==goods_inv.version)
                ok = query.execute()
                if ok:
                    print("更新成功")
                else:
                    # a = Inventory.get(Inventory.goods == goods_id)
                    a = Inventory.select().where(Inventory.goods == goods_id)[0]
                    print("更新失败")
                    return False
        return True


if __name__ == '__main__':
    import threading

    t1 = threading.Thread(target=sell2)
    t2 = threading.Thread(target=sell2)
    t1.start()
    t2.start()

    t1.join()
    t2.join()

# <__main__.ReconnectMySQLDatabase object at 0x000002160DCE7EE0>
# <__main__.ReconnectMySQLDatabase object at 0x000002160DCE7EE0>
# <__main__.ReconnectMySQLDatabase object at 0x000002160DCE7EE0>

# <inventory_srv.settings.settings.ReconnectMysqlDatabase object at 0x000001D5498128E0>
# <inventory_srv.settings.settings.ReconnectMysqlDatabase object at 0x000001D5498128E0>
# <inventory_srv.settings.settings.ReconnectMysqlDatabase object at 0x000001D5498128E0>

起因是因为我想偷懒 有一个 setting.DB 在连接数据库了 就不想在demo上重写 连接数据库的操作
但是这一个偷懒 导致出现一个bug

老师 和 其他 同学能不能解答我的疑问
你们可以 运行看看
t1 = threading.Thread(target=sell2)
t2 = threading.Thread(target=sell2)
会发现 怎么线程 2 读取到数据居然不是最新数据
究竟是怎么一回事 有人解答一下吗(与settings.DB.atomic()有关)

当然 为此 我 又新创建了一个函数来解决 这个问题
你们可以试一试 把代码替换成这个
t1 = threading.Thread(target=sell3)
t2 = threading.Thread(target=sell3)

写回答

1回答

bobby

2022-06-28

你需要了解一下数据库的隔离级别了,什么是可重复读,网上找资料好好研究一下可重复读隔离级别的作用,这个是常见面试题

0
0

Go+Python打造电商系统 自研微服务框架 抓紧高薪机遇

快速转型Go工程师,成为具备双语言后端能力的开发者

510 学习 · 530 问题

查看课程