3535import org .apache .pulsar .common .policies .data .ClusterData ;
3636import org .apache .pulsar .common .policies .data .TenantInfoImpl ;
3737import org .apache .pulsar .common .policies .data .TopicType ;
38+ import org .apache .pulsar .metadata .api .extended .SessionEvent ;
3839import org .apache .pulsar .metadata .impl .ZKMetadataStore ;
3940import org .apache .pulsar .tests .TestRetrySupport ;
4041import org .apache .pulsar .zookeeper .LocalBookkeeperEnsemble ;
4142import org .apache .pulsar .zookeeper .ZookeeperServerTest ;
4243import org .apache .zookeeper .ClientCnxn ;
4344import org .apache .zookeeper .ZooKeeper ;
45+ import org .awaitility .Awaitility ;
4446import org .awaitility .reflect .WhiteboxImpl ;
4547
4648@ Slf4j
@@ -65,6 +67,7 @@ public abstract class CanReconnectZKClientPulsarServiceBaseTest extends TestRetr
6567 protected PulsarAdmin admin ;
6668 protected PulsarClient client ;
6769 protected ZooKeeper localZkOfBroker ;
70+ protected volatile SessionEvent sessionEvent ;
6871 protected Object localMetaDataStoreClientCnx ;
6972 protected final AtomicBoolean connectionTerminationThreadKeepRunning = new AtomicBoolean ();
7073 private volatile Thread connectionTerminationThread ;
@@ -87,6 +90,10 @@ protected void startBrokers() throws Exception {
8790 broker = pulsar .getBrokerService ();
8891 ZKMetadataStore zkMetadataStore = (ZKMetadataStore ) pulsar .getLocalMetadataStore ();
8992 localZkOfBroker = zkMetadataStore .getZkClient ();
93+ zkMetadataStore .registerSessionListener (n -> {
94+ log .info ("Received session event: {}" , n );
95+ sessionEvent = n ;
96+ });
9097 ClientCnxn cnxn = WhiteboxImpl .getInternalState (localZkOfBroker , "cnxn" );
9198 Object sendThread = WhiteboxImpl .getInternalState (cnxn , "sendThread" );
9299 localMetaDataStoreClientCnx = WhiteboxImpl .getInternalState (sendThread , "clientCnxnSocket" );
@@ -157,6 +164,7 @@ protected void stopLocalMetadataStoreConnectionTermination() throws InterruptedE
157164 connectionTerminationThread .join ();
158165 connectionTerminationThread = null ;
159166 }
167+ Awaitility .await ().until (() -> SessionEvent .Reconnected .equals (sessionEvent ));
160168 }
161169
162170 protected void createDefaultTenantsAndClustersAndNamespace () throws Exception {
0 commit comments