zk源码—4.会话的实现原理
大纲1.创建会话
(1)客户端的会话状态
(2)服务端的会话创建
(3)会话ID的初始化实现
(4)设置的会话超时时间没生效的原因
2.分桶策略和会话管理
(1)分桶策略和过期队列
(2)会话激活
(3)会话超时检查
(4)会话清理
1.创建会话
(1)客户端的会话状态
(2)服务端的会话创建
(3)会话ID的初始化实现
(4)设置的会话超时时间没生效的原因
会话是zk中最核心的概念之一,客户端与服务端的交互都离不开会话的相关操作。其中包括临时节点的生命周期、客户端请求的顺序、Watcher通知机制等。比如会话关闭时,服务端会自动删除该会话所创建的临时节点。当客户端会话退出,通过Watcher机制可向订阅该事件的客户端发送通知。
(1)客户端的会话状态
当zk客户端与服务端成功建立连接后,就会创建一个会话。在zk客户端的运行过程(会话生命周期)中,会话会经历不同的状态变化。
这些不同的会话状态包括:正在连接(CONNECTING)、已经连接(CONNECTED)、会话关闭(CLOSE)、正在重新连接(RECONNECTING)、已经重新连接(RECONNECTED)等。
如果zk客户端需要与服务端建立连接创建一个会话,那么客户端就必须提供一个使用字符串表示的zk服务端地址列表。
当客户端刚开始创建ZooKeeper对象时,其会话状态就是CONNECTING,之后客户端会根据服务端地址列表中的IP地址分别尝试进行网络连接。如果成功连接上zk服务端,那么客户端的会话状态就会变为CONNECTED。
如果因为网络闪断或者其他原因造成客户端与服务端之间的连接断开,那么zk客户端会自动进行重连操作,同时其会话状态变为CONNECTING,直到重新连接上zk服务端后,客户端的会话状态才变回CONNECTED。
通常 总是在CONNECTING或CONNECTED间切换。如果出现会话超时、权限检查失败、客户端主动退出程序等情况,那么客户端的会话状态就会直接变为CLOSE。
public class CreateSessionDemo {
private final static String CONNECTSTRING = "192.168.1.5:2181";
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
//创建zk
ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
//如果当前的连接状态是连接成功, 则通过计数器去控制, 否则进行阻塞, 因为连接是需要时间的
//如果已经获得连接了, 那么状态会是SyncConnected
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
countDownLatch.countDown();
System.out.println(watchedEvent.getState());
}
//如果数据发生了变化
if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
System.out.println("节点发生了变化, 路径: " + watchedEvent.getPath());
}
}
});
//进行阻塞
countDownLatch.await();
//确定已经获得连接了再进行zk的操作: 增删改查
...
}
}
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final ZKWatchManager watchManager;//ZKWatchManager实现了ClientWatchManager
...
//1.初始化ZooKeeper对象
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
...
//创建客户端的Watcher管理器ZKWatchManager
watchManager = defaultWatchManager();
//2.设置会话默认的Watcher,保存在客户端的Watcher管理器ZKWatchManager中
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
//3.构造服务器地址列表管理器StaticHostProvider
hostProvider = aHostProvider;
//4.创建并初始化客户端的网络连接器ClientCnxn + 5.初始化SendThread和EventThread
cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly);
//6.启动SendThread和EventThread
cnxn.start();
}
protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket, canBeReadOnly);
}
//从配置中获取客户端使用的网络连接配置:使用NIO还是Netty,然后通过反射进行实例化客户端Socket
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName).getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor.newInstance(getClientConfig());
return clientCxnSocket;
}
public enum States {
//客户端的会话状态包括
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;
public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
...
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
protected volatile Watcher defaultWatcher;
...
}
protected ZKWatchManager defaultWatchManager() {
//创建客户端的Watcher管理器ZKWatchManager
return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}
...
}
public class ClientCnxn {
...
volatile States state = States.NOT_CONNECTED;
private final HostProvider hostProvider;
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
...
this.hostProvider = hostProvider;
//5.初始化SendThread和EventThread
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
...
}
//6.启动SendThread和EventThread
public void start() {
sendThread.start();
eventThread.start();
}
class SendThread extends ZooKeeperThread {
private final ClientCnxnSocket clientCnxnSocket;
...
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
//客户端刚开始创建ZooKeeper对象时,设置其会话状态为CONNECTING
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
//设置为守护线程
setDaemon(true);
}
@Override
public void run() {
...
while (state.isAlive()) {
...
//7.获取其中一个zk服务端的地址
serverAddress = hostProvider.next(1000);
//向zk服务端发起连接请求
startConnect(serverAddress);
...
}
...
}
private void startConnect(InetSocketAddress addr) throws IOException {
...
state = States.CONNECTING;
//8.创建TCP连接
//接下来以ClientCnxnSocketNetty的connect为例
clientCnxnSocket.connect(addr);
}
void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException {
...
//和服务端建立连接后的处理
state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
...
}
...
}
}
public class ClientCnxnSocketNetty extends ClientCnxnSocket {
//向zk服务端发起建立连接的请求
@Override
void connect(InetSocketAddress addr) throws IOException {
...
Bootstrap bootstrap = new Bootstrap().group(eventLoopGroup)
.channel(NettyUtils.nioOrEpollSocketChannel())
.option(ChannelOption.SO_LINGER, -1).option(ChannelOption.TCP_NODELAY, true)
.handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
bootstrap = configureBootstrapAllocator(bootstrap);
bootstrap.validate();
connectFuture = bootstrap.connect(addr);
...
}
private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
...
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
...
//与zk服务端建立好连接后的处理,调用父类ClientCnxnSocket的readConnectResult()方法
readConnectResult();
...
}
...
}
...
}
abstract class ClientCnxnSocket {
void readConnectResult() throws IOException {
...
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
}
...
}(2)服务端的会话创建
在zk服务端中,使用SessionImpl表示客户端与服务器端连接的会话实体。SessionImpl由三个部分组成:会话ID(sessionID)、会话超时时间(timeout)、会话关闭状态(isClosing)。
一.会话ID
会话ID是一个会话的标识符,当创建一次会话时,zk服务端会自动为其分配一个唯一的ID。
二.会话超时时间
一个会话的超时时间就是指一次会话从发起后到被服务器关闭的时长。设置会话超时时间后,zk服务端会参考设置的超时时间,最终计算一个服务端自己的超时时间。这个超时时间才是真正被zk服务端用于管理用户会话的超时时间。
三.会话关闭状态
会话关闭状态isClosing表示一个会话是否已经关闭。如果zk服务端检查到一个会话已经因为超时等原因失效时,就会将该会话的isClosing标记为关闭,之后就不再对该会话进行操作。
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
...
public static class SessionImpl implements Session {
SessionImpl(long sessionId, int timeout) {
this.sessionId = sessionId;
this.timeout = timeout;
isClosing = false;
}
final long sessionId;//会话ID
final int timeout;//会话超时时间
boolean isClosing;//会话关闭状态
...
}
...
}服务端收到客户端的创建会话请求后,进行会话创建的过程大概分四步:处理ConnectRequest请求、创建会话、请求处理链处理和会话响应。
步骤一:处理ConnectRequest请求
首先由NettyServerCnxn负责接收来自客户端的创建会话请求,然后反序列化出ConnectRequest对象,并完成会话超时时间的协商。
步骤二:创建会话
SessionTrackerImpl的createSession()方法会为该会话分配一个sessionID,并将该sessionID注册到sessionsById和sessionsWithTimeout中,同时通过SessionTrackerImpl的updateSessionExpiry()方法进行会话激活。
步骤三:请求处理链处理
接着调用ZooKeeperServer.firstProcessor的processRequest()方法,让该会话请求会在zk服务端的各个请求处理器之间进行顺序流转。
步骤四:会话响应
最后在请求处理器FinalRequestProcessor的processRequest()方法中进行会话响应。
//由网络连接工厂类监听到客户端的创建会话请求
public class NettyServerCnxnFactory extends ServerCnxnFactory {
class CnxnChannelHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
...
NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
cnxn.processMessage((ByteBuf) msg);
...
}
...
}
...
}
public class NettyServerCnxn extends ServerCnxn {
private volatile ZooKeeperServer zkServer;
void processMessage(ByteBuf buf) {
...
receiveMessage(buf);
...
}
private void receiveMessage(ByteBuf message) {
...
ZooKeeperServer zks = this.zkServer;
//处理会话连接请求
zks.processConnectRequest(this, bb);
...
}
...
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
protected SessionTracker sessionTracker;
...
public synchronized void startup() {
startupWithServerState(State.RUNNING);
}
private void startupWithServerState(State state) {
//创建并启动会话管理器
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
//初始化请求处理链
setupRequestProcessors();
...
}
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(), tickTime, createSessionTrackerServerId, getZooKeeperServerListener());
}
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
//步骤一:处理ConnectRequest请求
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
...
//协商会话超时时间
int sessionTimeout = connReq.getTimeOut();
byte passwd[] = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
...
long sessionId = connReq.getSessionId();
if (sessionId == 0) {
//步骤二:创建会话
long id = createSession(cnxn, passwd, sessionTimeout);
} else {
long clientSessionId = connReq.getSessionId();
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId);
}
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
}
}
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
if (passwd == null) {
passwd = new byte;
}
//通过会话管理器创建会话
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
setLocalSessionFlag(si);
//激活会话 + 提交请求到请求处理链进行处理
submitRequest(si);
return sessionId;
}
public void submitRequest(Request si) {
...
//激活会话
touch(si.cnxn);
//步骤三:交给请求处理链进行处理,在FinalRequestProcessor中会进行会话响应
firstProcessor.processRequest(si);
...
}
...
}
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
...
private final AtomicLong nextSessionId = new AtomicLong();
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
public SessionTrackerImpl(SessionExpirer expirer, ConcurrentMap<Long, Integer> sessionsWithTimeout,
int tickTime, long serverId, ZooKeeperServerListener listener) {
super("SessionTracker", listener);
this.expirer = expirer;
this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
this.sessionsWithTimeout = sessionsWithTimeout;
//初始化SessionId
this.nextSessionId.set(initializeNextSession(serverId));
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
EphemeralType.validateServerId(serverId);
}
...
public long createSession(int sessionTimeout) {
//为会话分配一个sessionID
long sessionId = nextSessionId.getAndIncrement();
//将sessionID注册到sessionsById和sessionsWithTimeout中
addSession(sessionId, sessionTimeout);
return sessionId;
}
public synchronized boolean addSession(long id, int sessionTimeout) {
sessionsWithTimeout.put(id, sessionTimeout);
boolean added = false;
SessionImpl session = sessionsById.get(id);
if (session == null) {
session = new SessionImpl(id, sessionTimeout);
}
SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
if (existedSession != null) {
session = existedSession;
} else {
added = true;
LOG.debug("Adding session 0x" + Long.toHexString(id));
}
...
updateSessionExpiry(session, sessionTimeout);
return added;
}
private void updateSessionExpiry(SessionImpl s, int timeout) {
...
sessionExpiryQueue.update(s, timeout);
}
...
}
public class FinalRequestProcessor implements RequestProcessor {
...
public void processRequest(Request request) {
...
ServerCnxn cnxn = request.cnxn;
//步骤四:会话响应
cnxn.sendResponse(hdr, rsp, "response");
...
}
...
}
public abstract class ServerCnxn implements Stats, Watcher {
...
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
serverStats().updateClientResponseSize(b.length - 4);
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
}
...
}
public class NettyServerCnxn extends ServerCnxn {
...
@Override
public void sendBuffer(ByteBuffer sendBuffer) {
if (sendBuffer == ServerCnxnFactory.closeConn) {
close();
return;
}
channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(onSendBufferDoneListener);
}
...
}(3)会话ID的初始化实现
SessionTracker是zk服务端的会话管理器,zk会话的整个生命周期都离不开SessionTracker的参与。SessionTracker是一个接口类型,规定了会话管理的相关操作行为,具体的会话管理逻辑则由SessionTrackerImpl来完成。
SessionTrackerImpl类实现了SessionTracker接口,其中有四个关键字段:sessionExpiryQueue字段表示的是会话过期队列,用于管理会话自动过期。nextSessionId字段记录了当前生成的会话ID。sessionsById字段用于根据会话ID来管理具体的会话实体。sessionsWithTimeout字段用于根据会话ID管理会话的超时时间。
在SessionTrackerImpl类初始化时,会调用initializeNextSession()方法来生成一个初始化的会话ID。之后在zk的运行过程中,会在该会话ID的基础上为每个会话分配ID。
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
...
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;//过期队列
private final AtomicLong nextSessionId = new AtomicLong();//当前生成的会话ID
ConcurrentHashMap<Long, SessionImpl> sessionsById;//根据会话ID来管理具体的会话实体
ConcurrentMap<Long, Integer> sessionsWithTimeout;//根据不同的会话ID管理每个会话的超时时间
public SessionTrackerImpl(SessionExpirer expirer, ConcurrentMap<Long, Integer> sessionsWithTimeout,
int tickTime, long serverId, ZooKeeperServerListener listener) {
super("SessionTracker", listener);
this.expirer = expirer;
this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
this.sessionsWithTimeout = sessionsWithTimeout;
//初始化SessionId
this.nextSessionId.set(initializeNextSession(serverId));
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
EphemeralType.validateServerId(serverId);
}
public static long initializeNextSession(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id <<56);
return nextSid;
}
...
}(4)会话清理
当SessionTracker的会话超时检查线程遍历出一些已经过期的会话时,就要进行会话清理了,会话清理的步骤如下:
一.标记会话状态为已关闭
二.发起关闭会话请求
三.收集临时节点
四.添加临时节点的删除请求到事务变更队列
五.删除临时节点
六.移除会话
七.关闭NIOServerCnxn
一.标记会话状态为已关闭
SessionTracker的setSessionClosing()方法会标记会话状态为已关闭,这是因为整个会话清理过程需要一段时间,为了保证在会话清理期间不再处理来自该会话对应的客户端的请求,SessionTracker会首先将该会话的isClosing属性标记为true。
二.发起关闭会话请求
ZooKeeperServer的expire()方法和close()方法会发起关闭会话请求,为了使对该会话的关闭操作在整个服务端集群中都生效,zk使用提交"关闭会话"请求的方式,将请求交给PrepRequestProcessor处理。
三.收集临时节点
PrepRequestProcessor的pRequest2Txn()方法会收集需要清理的临时节点。在zk中,一旦某个会话失效,那么和该会话相关的临时节点也要被清除掉。因此需要首先将服务器上所有和该会话相关的临时节点找出来。
zk的内存数据库会为每个会话都保存一份由该会话维护的临时节点集合。因此在会话清理阶段,只需根据当前即将关闭的会话的sessionID,便可以从zk的内存数据库中获取到该会话的临时节点列表。
四.添加临时节点的删除请求到事务变更队列
将临时节点的删除请求添加到事务变更队列outstandingChanges中。完成该会话相关的临时节点收集后,zk会将这些临时节点逐个转换成节点删除请求,添加到事务变更队列中。
五.删除临时节点
FinalRequestProcessor的processRequest()方法触发删除临时节点。当收集完所有需要删除的临时节点,以及创建了对应的节点删除请求后,便会在FinalRequestProcessor的processRequest()方法中,通过调用ZooKeeperServer的processTxn()方法,调用到ZKDatabase的processTxn()方法,最后调用DataTree的killSession()方法,从而最终删除内存数据库中该会话的所有临时节点。
六.移除会话
在FinalRequestProcessor的processRequest()方法中,会通过调用ZooKeeperServer的processTxn()方法,调用到SessionTracker的removeSession()方法将会话从SessionTracker移除。即从sessionsById、sessionsWithTimeout、sessionExpiryQueue中移除会话。
七.关闭NIOServerCnxn
在FinalRequestProcessor的processRequest()方法中,最后会调用FinalRequestProcessor的closeSession()方法,从NIOServerCnxnFactory的sessionMap中将该会话进行移除。
//时间的二进制表示,41位
10100000110000011110001000100110111110111
//左移24位变成,64位
0100000110000011110001000100110111110111000000000000000000000000
//右移8位变成,64位
0000000001000001100000111100010001001101111101110000000000000000
//假设服务器SID为2,那么左移56位变成
0000001000000000000000000000000000000000000000000000000000000000
//位与运算
0000001001000001100000111100010001001101111101110000000000000000
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]