并发改良,还没有加入aof
来源:13-14 实现核心Database(二)

weixin_慕沐2155417
2022-09-17
老师源码中的Handle函数是监听channel,拿到一个payload,exec完,写回一个结果到客户端,然后再取channel中下一个payload,再exec,再写回客户端。
我做了一个测试,在execSet中睡眠了一秒,然后在tcp测试工具中发送了一百个*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n 请求
结果是每一个请求的回复都隔了一秒,也就是100个请求等一百个回复等了100秒。
func execSet(db *DB, args [][]byte) resp.Reply {
time.Sleep(time.Second) //睡眠一秒
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
db.PutEntity(key, entity)
return &reply.OkReply{}
}
//老师源码
func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
// closing handler refuse new connection
_ = conn.Close()
}
client := connection.NewConn(conn)
h.activeConn.Store(client, 1)
ch := parser.ParseStream(conn)
for payload := range ch {
if payload.Err != nil {
if payload.Err == io.EOF ||
payload.Err == io.ErrUnexpectedEOF ||
strings.Contains(payload.Err.Error(), "use of closed network connection") {
// connection closed
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
// protocol err
errReply := reply.MakeErrReply(payload.Err.Error())
err := client.Write(errReply.ToBytes())
if err != nil {
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
continue
}
if payload.Data == nil {
logger.Error("empty payload")
continue
}
r, ok := payload.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
continue
}
result := h.db.Exec(client, r.Args)
if result != nil {
_ = client.Write(result.ToBytes())
} else {
_ = client.Write(unknownErrReplyBytes)
}
}
}
我的改动是,handle函数在遍历channel的时候,每拿到一个payload,开一个协程去做exec(当然如果协程数量太多,可以设置一个协程数量最大值,用队列去分配),结果是100个set请求,收到100个回复只需要不到两秒钟的时间
func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
//1 判断处理器是否处于关闭状态,如果是则拒绝新的连接
if h.closing.Get() {
// closing handler refuse new connection
conn.Close()
}
//2 将原始连接包装成redis连接,并将新连接加入连接字典
println(conn.RemoteAddr())
client := connection.NewConn(conn)
h.activeConn.Store(client, 1)
//3 将redis连接送给解析器,并监听数据通道
ch := parser.ParseStream(conn)
for payload := range ch {
go func() {
h.exec(payload, client)
}()
}
}
//将原来的exec代码片段抽出来做一个函数,当然需要把代码中的continue语句都删掉,因为已经不在循环里
func (h *RespHandler) exec(payload *parser.Payload, client *connection.Connection) {
if payload.Err != nil {
if payload.Err == io.EOF ||
payload.Err == io.ErrUnexpectedEOF ||
strings.Contains(payload.Err.Error(), "use of closed network connection") {
// connection closed
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
// protocol err
errReply := reply.MakeErrReply(payload.Err.Error())
err := client.Write(errReply.ToBytes())
if err != nil {
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
}
if payload.Data == nil {
logger.Error("empty payload")
}
r, ok := payload.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
}
result := h.mdb.Exec(client, r.Args)
if result != nil {
client.Write(result.ToBytes())
} else {
client.Write(unknownErrReplyBytes)
}
}
老师你觉得这样可行吗?
写回答
3回答
-
weixin_慕沐2155417
提问者
2022-12-19
你这单个客户端的命令不按顺序请求会出问题呀。例如:先发送 set 命令,在发送 get 命令。你这样子并发可能会导致 get 命令先于 set 执行,而得到错误的结果。并发应该是多个客户端之间并发,单个客户端的命令必须要按序执行。
00 -
Moody
2022-09-18
有些情况下是有意义的,不过你一开始测试的时候是用了单个客户端测试的,所以是串行执行的。如果多个客户端同时请求的话,应该是并行的。
原生的redis就是纯单线程串行的,从效果上来说还是可以的。00 -
weixin_慕沐2155417
提问者
2022-09-17
而且,底层的dict用了sync.Map,岂不是更能体现多协程并发访问的安全性问题?老师源码中,永远只有一个协程在操作这个Map
00
相似问题