如图,这样B不是会覆盖到A的修改吗?这样符合预期吗?

来源:8-2 ZooKeeper数据模式及ZNode节点的特性

是个么有感情的杀手

2023-02-22

图片描述

写回答

1回答

大能老师

2023-02-22

version的概念

该Znode结点的版本号。每一个Znode结点被建立时版本号都为0,每更新一次都会致使版本号加1,即便更新先后Znode存储的值没有变化版本号也会加1。version值能够形象的理解为Znode结点被更新的次数。Znode状态信息中的版本号信息,使得服务端能够对多个客户端对同一个Znode的更新操做作并发控制。整个过程和java中的CAS有点像,是一种乐观锁的并发控制策略,而version值起到了冲突检测的功能。客户端拿到Znode的version信息,并在更新时附上这个version信息,服务端在更新Znode时必须必须比较客户端的version和Znode的实际version,只有这两个version一致时才会进行修改。

数据的更新和 version 的增加操作实际上是原子操作

首先是 client端的:

面代码Zookeeper.setData方法: Client A构建对象发送给Follower A

public Stat setData( final String path, byte data[], int version)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils. validatePath(clientPath);

//通过传入的path构造完整serverPath
        final String serverPath = prependChroot(clientPath);

//构造一个Request头
        RequestHeader h = new RequestHeader();
        //设置类型为setData
        h.setType(ZooDefs.OpCode.setData);
        //构造一个SetData请求体
        SetDataRequest request = new SetDataRequest();
        //设置需要修改node的serverPath
        request.setPath(serverPath);
        //设置需要修改的node的data
        request.setData(data);
        //设置需要修改的node的version
        request.setVersion(version);
        
        //构建SetDataResponse对象
        SetDataResponse response = new SetDataResponse();

//提交请求,并等待返回结果
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        //如果r.getErr()不能0,则表示有错误,抛出异常
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                    clientPath);
        }
        return response.getStat();
    }

可以看到数据封装到一个请求里:

 //设置需要修改node的serverPath
 request.setPath(serverPath);
 //设置需要修改的node的data
 request.setData(data);
 //设置需要修改的node的version
 request.setVersion(version);

接着请求到 sever 端,如果是 Follower 节点就会转发给 leader 节点。

*setData请求的代码:*

 protected   void  pRequest2Txn(  int  type,  long  zxid, Request request, Record record,  boolean  deserialize)
         throws  KeeperException, IOException, RequestProcessorException
    {
        request.hdr =  new  TxnHeader(request.sessionId , request.cxid, zxid,
                                    zks.getTime(), type);

switch (type) {
            .....
            case OpCode.setData:
                //检查session
                zks.sessionTracker .checkSession(request.sessionId, request.getOwner());
                //将record转成SetDataRequest类型
                SetDataRequest setDataRequest = ( SetDataRequest)record;
                if (deserialize)
                    //将Request.reques数据反序列化成setDataRequest对象
                    ByteBufferInputStream.byteBuffer2Record(request. request, setDataRequest);
                //获取需要需要修改的znode的path
                path = setDataRequest.getPath();
                //获取内存数据中获取path对于的znode信息
                nodeRecord = getRecordForPath( path);
                //检查对 znode是否有写权限
                checkACL( zks, nodeRecord .acl , ZooDefs.Perms.WRITE,
                        request.authInfo);
                //获取客户端设置的版本号
                version = setDataRequest.getVersion();
                //获取节点当前版本号
                int currentVersion = nodeRecord.stat.getVersion();
                //如果客户端设置的版本号不是-1,且不等于当前版本号,则抛出KeeperException.BadVersionException异常
                if (version != -1 && version != currentVersion) {
                    throw new KeeperException .BadVersionException(path);
                }
                //version等于当前版本加1
                version = currentVersion + 1;
                //构建SetDataTxn对象,并赋给request.txn
                request. txn = new SetDataTxn( path, setDataRequest.getData(), version);
                //拷贝nodeRecord
                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                //将nodeRecord的当前版本号设置为version
                nodeRecord.stat.setVersion( version);
                //将nodeRecord放入outstandingChanges
                //path和nodeRecord map放入outstandingChangesForPath
                addChangeRecord( nodeRecord);
                break ;
          ......
        }
    }

可以看到数据操作和版本更新是同一个 zxid 进行约束的,也就是同个事务处理。

//获取客户端设置的版本号
version = setDataRequest.getVersion();
//获取节点当前版本号
int currentVersion = nodeRecord.stat.getVersion();
//如果客户端设置的版本号不是-1,且不等于当前版本号,则抛出KeeperException.BadVersionException异常
                
if (version != -1 && version != currentVersion) {
           throw new KeeperException .BadVersionException(path);
}
//version等于当前版本加1
version = currentVersion + 1;

所以,理论上应该是一个操作,既"set data if version is expected"

以上:

//img.mukewang.com/szimg/63f5b9f9097fe26613220768.jpg

0
0

Java分布式架构设计与开发实战

项目贯穿式讲解,真正将理论与实战相结合

325 学习 · 74 问题

查看课程