并发改良,还没有加入aof

来源:13-14 实现核心Database(二)

weixin_慕沐2155417

2022-09-17

老师源码中的Handle函数是监听channel,拿到一个payload,exec完,写回一个结果到客户端,然后再取channel中下一个payload,再exec,再写回客户端。
我做了一个测试,在execSet中睡眠了一秒,然后在tcp测试工具中发送了一百个*3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n 请求
结果是每一个请求的回复都隔了一秒,也就是100个请求等一百个回复等了100秒。

func execSet(db *DB, args [][]byte) resp.Reply {
	time.Sleep(time.Second) //睡眠一秒
	key := string(args[0])
	value := args[1]
	entity := &database.DataEntity{
		Data: value,
	}
	db.PutEntity(key, entity)
	return &reply.OkReply{}
}
//老师源码
func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
	if h.closing.Get() {
		// closing handler refuse new connection
		_ = conn.Close()
	}

	client := connection.NewConn(conn)
	h.activeConn.Store(client, 1)

	ch := parser.ParseStream(conn)
	for payload := range ch {
		if payload.Err != nil {
			if payload.Err == io.EOF ||
				payload.Err == io.ErrUnexpectedEOF ||
				strings.Contains(payload.Err.Error(), "use of closed network connection") {
				// connection closed
				h.closeClient(client)
				logger.Info("connection closed: " + client.RemoteAddr().String())
				return
			}
			// protocol err
			errReply := reply.MakeErrReply(payload.Err.Error())
			err := client.Write(errReply.ToBytes())
			if err != nil {
				h.closeClient(client)
				logger.Info("connection closed: " + client.RemoteAddr().String())
				return
			}
			continue
		}
		if payload.Data == nil {
			logger.Error("empty payload")
			continue
		}
		r, ok := payload.Data.(*reply.MultiBulkReply)
		if !ok {
			logger.Error("require multi bulk reply")
			continue
		}
		result := h.db.Exec(client, r.Args)
		if result != nil {
			_ = client.Write(result.ToBytes())
		} else {
			_ = client.Write(unknownErrReplyBytes)
		}
	}
}

我的改动是,handle函数在遍历channel的时候,每拿到一个payload,开一个协程去做exec(当然如果协程数量太多,可以设置一个协程数量最大值,用队列去分配),结果是100个set请求,收到100个回复只需要不到两秒钟的时间

func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {

	//1 判断处理器是否处于关闭状态,如果是则拒绝新的连接
	if h.closing.Get() {
		// closing handler refuse new connection
		conn.Close()
	}
	//2 将原始连接包装成redis连接,并将新连接加入连接字典
	println(conn.RemoteAddr())
	client := connection.NewConn(conn)
	h.activeConn.Store(client, 1)

	//3 将redis连接送给解析器,并监听数据通道

	ch := parser.ParseStream(conn)

	for payload := range ch {
		go func() {
			h.exec(payload, client)
		}()
	}

}
//将原来的exec代码片段抽出来做一个函数,当然需要把代码中的continue语句都删掉,因为已经不在循环里
func (h *RespHandler) exec(payload *parser.Payload, client *connection.Connection) {

	if payload.Err != nil {
		if payload.Err == io.EOF ||
			payload.Err == io.ErrUnexpectedEOF ||
			strings.Contains(payload.Err.Error(), "use of closed network connection") {
			// connection closed
			h.closeClient(client)
			logger.Info("connection closed: " + client.RemoteAddr().String())
			return
		}
		// protocol err
		errReply := reply.MakeErrReply(payload.Err.Error())
		err := client.Write(errReply.ToBytes())
		if err != nil {
			h.closeClient(client)
			logger.Info("connection closed: " + client.RemoteAddr().String())
			return
		}

	}
	if payload.Data == nil {
		logger.Error("empty payload")

	}
	r, ok := payload.Data.(*reply.MultiBulkReply)
	if !ok {
		logger.Error("require multi bulk reply")

	}
	result := h.mdb.Exec(client, r.Args)
	if result != nil {
		client.Write(result.ToBytes())
	} else {
		client.Write(unknownErrReplyBytes)
	}
}

老师你觉得这样可行吗?

写回答

3回答

weixin_慕沐2155417

提问者

2022-12-19

你这单个客户端的命令不按顺序请求会出问题呀。例如:先发送 set 命令,在发送 get 命令。你这样子并发可能会导致 get 命令先于 set 执行,而得到错误的结果。并发应该是多个客户端之间并发,单个客户端的命令必须要按序执行。

0
0

Moody

2022-09-18

有些情况下是有意义的,不过你一开始测试的时候是用了单个客户端测试的,所以是串行执行的。如果多个客户端同时请求的话,应该是并行的。
原生的redis就是纯单线程串行的,从效果上来说还是可以的。
0
0

weixin_慕沐2155417

提问者

2022-09-17

而且,底层的dict用了sync.Map,岂不是更能体现多协程并发访问的安全性问题?老师源码中,永远只有一个协程在操作这个Map

0
0

深入Go底层原理,重写Redis中间件实战

深入Go语言原理+Go重写Redis

474 学习 · 172 问题

查看课程