Skip to content

Commit 5583102

Browse files
authored
[fix] [log] Do not print error log if tenant/namespace does not exist when calling get topic metadata (#23291)
1 parent b1c5d96 commit 5583102

File tree

2 files changed

+85
-7
lines changed

2 files changed

+85
-7
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import java.util.stream.Collectors;
6464
import javax.naming.AuthenticationException;
6565
import javax.net.ssl.SSLSession;
66+
import javax.ws.rs.WebApplicationException;
67+
import javax.ws.rs.core.Response;
6668
import org.apache.bookkeeper.mledger.AsyncCallbacks;
6769
import org.apache.bookkeeper.mledger.Entry;
6870
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -672,8 +674,6 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
672674
commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError,
673675
ex.getMessage(), requestId);
674676
} else {
675-
log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
676-
topicName, ex.getMessage(), ex);
677677
ServerError error = ServerError.ServiceNotReady;
678678
if (ex instanceof MetadataStoreException) {
679679
error = ServerError.MetadataError;
@@ -685,6 +685,14 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
685685
error = ServerError.MetadataError;
686686
}
687687
}
688+
if (error == ServerError.TopicNotFound) {
689+
log.info("Trying to get Partitioned Metadata for a resource not exist"
690+
+ "[{}] {}: {}", remoteAddress,
691+
topicName, ex.getMessage());
692+
} else {
693+
log.warn("Failed to get Partitioned Metadata [{}] {}: {}",
694+
remoteAddress, topicName, ex.getMessage(), ex);
695+
}
688696
commandSender.sendPartitionMetadataResponse(error, ex.getMessage(),
689697
requestId);
690698
}
@@ -702,6 +710,16 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
702710
return null;
703711
}).exceptionally(ex -> {
704712
logAuthException(remoteAddress, "partition-metadata", getPrincipal(), Optional.of(topicName), ex);
713+
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
714+
if (actEx instanceof WebApplicationException restException) {
715+
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
716+
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.MetadataError,
717+
"Tenant or namespace or topic does not exist: " + topicName.getNamespace() ,
718+
requestId));
719+
lookupSemaphore.release();
720+
return null;
721+
}
722+
}
705723
final String msg = "Exception occurred while trying to authorize get Partition Metadata";
706724
writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg,
707725
requestId));
@@ -3663,13 +3681,22 @@ protected void messageReceived() {
36633681
private static void logAuthException(SocketAddress remoteAddress, String operation,
36643682
String principal, Optional<TopicName> topic, Throwable ex) {
36653683
String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
3666-
if (ex instanceof AuthenticationException) {
3684+
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
3685+
if (actEx instanceof AuthenticationException) {
36673686
log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}",
3668-
remoteAddress, operation, principal, topicString, ex.getMessage());
3669-
} else {
3670-
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
3671-
remoteAddress, operation, principal, topicString, ex);
3687+
remoteAddress, operation, principal, topicString, actEx.getMessage());
3688+
return;
3689+
} else if (actEx instanceof WebApplicationException restException){
3690+
// Do not print error log if users tries to access a not found resource.
3691+
if (restException.getResponse().getStatus() == Response.Status.NOT_FOUND.getStatusCode()) {
3692+
log.info("[{}] Trying to authenticate for a topic which under a namespace not exists: operation={},"
3693+
+ " principal={}{}, reason: {}",
3694+
remoteAddress, operation, principal, topicString, actEx.getMessage());
3695+
return;
3696+
}
36723697
}
3698+
log.error("[{}] Error trying to authenticate: operation={}, principal={}{}",
3699+
remoteAddress, operation, principal, topicString, ex);
36733700
}
36743701

36753702
private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation,

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,4 +578,55 @@ public void testGetMetadataIfNotAllowedCreateOfNonPersistentTopic(boolean config
578578
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
579579
});
580580
}
581+
582+
@Test(dataProvider = "topicDomains")
583+
public void testNamespaceNotExist(TopicDomain topicDomain) throws Exception {
584+
int lookupPermitsBefore = getLookupRequestPermits();
585+
final String namespaceNotExist = BrokerTestUtil.newUniqueName("public/ns");
586+
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp");
587+
PulsarClientImpl[] clientArray = getClientsToTest(false);
588+
for (PulsarClientImpl client : clientArray) {
589+
try {
590+
PartitionedTopicMetadata topicMetadata = client
591+
.getPartitionedTopicMetadata(topicNameStr, true, true)
592+
.join();
593+
log.info("Get topic metadata: {}", topicMetadata.partitions);
594+
fail("Expected a not found ex");
595+
} catch (Exception ex) {
596+
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
597+
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
598+
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
599+
}
600+
}
601+
// Verify: lookup semaphore has been releases.
602+
Awaitility.await().untilAsserted(() -> {
603+
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
604+
});
605+
}
606+
607+
@Test(dataProvider = "topicDomains")
608+
public void testTenantNotExist(TopicDomain topicDomain) throws Exception {
609+
int lookupPermitsBefore = getLookupRequestPermits();
610+
final String tenantNotExist = BrokerTestUtil.newUniqueName("tenant");
611+
final String namespaceNotExist = BrokerTestUtil.newUniqueName(tenantNotExist + "/default");
612+
final String topicNameStr = BrokerTestUtil.newUniqueName(topicDomain.toString() + "://" + namespaceNotExist + "/tp");
613+
PulsarClientImpl[] clientArray = getClientsToTest(false);
614+
for (PulsarClientImpl client : clientArray) {
615+
try {
616+
PartitionedTopicMetadata topicMetadata = client
617+
.getPartitionedTopicMetadata(topicNameStr, true, true)
618+
.join();
619+
log.info("Get topic metadata: {}", topicMetadata.partitions);
620+
fail("Expected a not found ex");
621+
} catch (Exception ex) {
622+
Throwable unwrapEx = FutureUtil.unwrapCompletionException(ex);
623+
assertTrue(unwrapEx instanceof PulsarClientException.BrokerMetadataException ||
624+
unwrapEx instanceof PulsarClientException.TopicDoesNotExistException);
625+
}
626+
}
627+
// Verify: lookup semaphore has been releases.
628+
Awaitility.await().untilAsserted(() -> {
629+
assertEquals(getLookupRequestPermits(), lookupPermitsBefore);
630+
});
631+
}
581632
}

0 commit comments

Comments
 (0)