Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,9 @@ public String toString(boolean lineBreaks) {
// which may happen when the remote is too old.
for (ApiKeys apiKey : ApiKeys.clientApis()) {
if (!apiKeysText.containsKey(apiKey.id)) {
StringBuilder bld = new StringBuilder();
bld.append(apiKey.name).append("(").
append(apiKey.id).append("): ").append("UNSUPPORTED");
apiKeysText.put(apiKey.id, bld.toString());
String bld = apiKey.name + "(" +
apiKey.id + "): " + "UNSUPPORTED";
apiKeysText.put(apiKey.id, bld);
}
}
String separator = lineBreaks ? ",\n\t" : ", ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,12 @@ CreatableTopic convertToCreatableTopic() {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(name=").append(name).
append(", numPartitions=").append(numPartitions.map(String::valueOf).orElse("default")).
append(", replicationFactor=").append(replicationFactor.map(String::valueOf).orElse("default")).
append(", replicasAssignments=").append(replicasAssignments).
append(", configs=").append(configs).
append(")");
return bld.toString();
return "(name=" + name +
", numPartitions=" + numPartitions.map(String::valueOf).orElse("default") +
", replicationFactor=" + replicationFactor.map(String::valueOf).orElse("default") +
", replicasAssignments=" + replicasAssignments +
", configs=" + configs +
")";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public enum MemberState {
Expand Down Expand Up @@ -120,7 +121,7 @@ public enum MemberState {

RECONCILING.previousValidStates = Arrays.asList(STABLE, JOINING, ACKNOWLEDGING, RECONCILING);

ACKNOWLEDGING.previousValidStates = Arrays.asList(RECONCILING);
ACKNOWLEDGING.previousValidStates = Collections.singletonList(RECONCILING);

FATAL.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING,
PREPARE_LEAVING, LEAVING, UNSUBSCRIBED);
Expand All @@ -133,11 +134,11 @@ public enum MemberState {
PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING,
ACKNOWLEDGING, UNSUBSCRIBED);

LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING);
LEAVING.previousValidStates = Collections.singletonList(PREPARE_LEAVING);

UNSUBSCRIBED.previousValidStates = Arrays.asList(PREPARE_LEAVING, LEAVING, FENCED);

STALE.previousValidStates = Arrays.asList(LEAVING);
STALE.previousValidStates = Collections.singletonList(LEAVING);
}

private List<MemberState> previousValidStates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,13 +855,12 @@ private boolean shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
}

int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);
if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
&& first.baseSequence() != firstInFlightSequence)
// If the queued batch already has an assigned sequence, then it is being retried.
// In this case, we wait until the next immediate batch is ready and drain that.
// We only move on when the next in line batch is complete (either successfully or due to
// a fatal broker error). This effectively reduces our in flight request count to 1.
return true;
// If the queued batch already has an assigned sequence, then it is being retried.
// In this case, we wait until the next immediate batch is ready and drain that.
// We only move on when the next in line batch is complete (either successfully or due to
// a fatal broker error). This effectively reduces our in flight request count to 1.
return firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()
&& first.baseSequence() != firstInFlightSequence;
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ static void writeHeader(DataOutputStream out, long offset, int size) throws IOEx

private static final class DataLogInputStream implements LogInputStream<AbstractLegacyRecordBatch> {
private final InputStream stream;
protected final int maxMessageSize;
private final int maxMessageSize;
private final ByteBuffer offsetAndSizeBuffer;

DataLogInputStream(InputStream stream, int maxMessageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,11 @@ public LogDirInfo(Errors error, Map<TopicPartition, ReplicaInfo> replicaInfos) {

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("(error=")
.append(error)
.append(", replicas=")
.append(replicaInfos)
.append(")");
return builder.toString();
return "(error=" +
error +
", replicas=" +
replicaInfos +
")";
}
}

Expand All @@ -126,15 +124,13 @@ public ReplicaInfo(long size, long offsetLag, boolean isFuture) {

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("(size=")
.append(size)
.append(", offsetLag=")
.append(offsetLag)
.append(", isFuture=")
.append(isFuture)
.append(")");
return builder.toString();
return "(size=" +
size +
", offsetLag=" +
offsetLag +
", isFuture=" +
isFuture +
")";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,20 +316,18 @@ public FetchRequest build(short version) {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=FetchRequest").
append(", replicaId=").append(replicaId).
append(", maxWait=").append(maxWait).
append(", minBytes=").append(minBytes).
append(", maxBytes=").append(maxBytes).
append(", fetchData=").append(toFetch).
append(", isolationLevel=").append(isolationLevel).
append(", removed=").append(removed.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", "))).
append(", replaced=").append(replaced.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", "))).
append(", metadata=").append(metadata).
append(", rackId=").append(rackId).
append(")");
return bld.toString();
return "(type=FetchRequest" +
", replicaId=" + replicaId +
", maxWait=" + maxWait +
", minBytes=" + minBytes +
", maxBytes=" + maxBytes +
", fetchData=" + toFetch +
", isolationLevel=" + isolationLevel +
", removed=" + removed.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", ")) +
", replaced=" + replaced.stream().map(TopicIdPartition::toString).collect(Collectors.joining(", ")) +
", metadata=" + metadata +
", rackId=" + rackId +
")";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,14 @@ private static Map<String, LeaderAndIsrTopicState> groupByTopic(List<LeaderAndIs

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=LeaderAndIsRequest")
.append(", controllerId=").append(controllerId)
.append(", controllerEpoch=").append(controllerEpoch)
.append(", brokerEpoch=").append(brokerEpoch)
.append(", partitionStates=").append(partitionStates)
.append(", topicIds=").append(topicIds)
.append(", liveLeaders=(").append(liveLeaders.stream().map(Node::toString).collect(Collectors.joining(", "))).append(")")
.append(")");
return bld.toString();
return "(type=LeaderAndIsRequest" +
", controllerId=" + controllerId +
", controllerEpoch=" + controllerEpoch +
", brokerEpoch=" + brokerEpoch +
", partitionStates=" + partitionStates +
", topicIds=" + topicIds +
", liveLeaders=(" + liveLeaders.stream().map(Node::toString).collect(Collectors.joining(", ")) + ")" +
")";

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ private ProduceRequest build(short version, boolean validate) {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=ProduceRequest")
.append(", acks=").append(data.acks())
.append(", timeout=").append(data.timeoutMs())
.append(", partitionRecords=(").append(data.topicData().stream().flatMap(d -> d.partitionData().stream()).collect(Collectors.toList()))
.append("), transactionalId='").append(data.transactionalId() != null ? data.transactionalId() : "")
.append("'");
return bld.toString();
return "(type=ProduceRequest" +
", acks=" + data.acks() +
", timeout=" + data.timeoutMs() +
", partitionRecords=(" + data.topicData().stream().flatMap(d -> d.partitionData().stream()).collect(Collectors.toList()) +
"), transactionalId='" + (data.transactionalId() != null ? data.transactionalId() : "") +
"'";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,13 @@ public StopReplicaRequest build(short version) {

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type=StopReplicaRequest").
append(", controllerId=").append(controllerId).
append(", controllerEpoch=").append(controllerEpoch).
append(", brokerEpoch=").append(brokerEpoch).
append(", deletePartitions=").append(deletePartitions).
append(", topicStates=").append(topicStates.stream().map(StopReplicaTopicState::toString).collect(Collectors.joining(","))).
append(")");
return bld.toString();
return "(type=StopReplicaRequest" +
", controllerId=" + controllerId +
", controllerEpoch=" + controllerEpoch +
", brokerEpoch=" + brokerEpoch +
", deletePartitions=" + deletePartitions +
", topicStates=" + topicStates.stream().map(StopReplicaTopicState::toString).collect(Collectors.joining(",")) +
")";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,15 @@ private static Map<String, UpdateMetadataTopicState> groupByTopic(Map<String, Uu

@Override
public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("(type: UpdateMetadataRequest=").
append(", controllerId=").append(controllerId).
append(", controllerEpoch=").append(controllerEpoch).
append(", kraftController=").append(kraftController).
append(", type=").append(updateType).
append(", brokerEpoch=").append(brokerEpoch).
append(", partitionStates=").append(partitionStates).
append(", liveBrokers=").append(liveBrokers.stream().map(UpdateMetadataBroker::toString).collect(Collectors.joining(", "))).
append(")");
return bld.toString();
return "(type: UpdateMetadataRequest=" +
", controllerId=" + controllerId +
", controllerEpoch=" + controllerEpoch +
", kraftController=" + kraftController +
", type=" + updateType +
", brokerEpoch=" + brokerEpoch +
", partitionStates=" + partitionStates +
", liveBrokers=" + liveBrokers.stream().map(UpdateMetadataBroker::toString).collect(Collectors.joining(", ")) +
")";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,15 @@ public Key resolveKey(JsonWebSignature jws, List<JsonWebStructure> nestingContex
if (refreshingHttpsJwks.maybeExpediteRefresh(keyId))
log.debug("Refreshing JWKs from {} as no suitable verification key for JWS w/ header {} was found in {}", refreshingHttpsJwks.getLocation(), jws.getHeaders().getFullHeaderAsJsonString(), jwks);

StringBuilder sb = new StringBuilder();
sb.append("Unable to find a suitable verification key for JWS w/ header ").append(jws.getHeaders().getFullHeaderAsJsonString());
sb.append(" from JWKs ").append(jwks).append(" obtained from ").append(
refreshingHttpsJwks.getLocation());
throw new UnresolvableKeyException(sb.toString());
String sb = "Unable to find a suitable verification key for JWS w/ header " + jws.getHeaders().getFullHeaderAsJsonString() +
" from JWKs " + jwks + " obtained from " +
refreshingHttpsJwks.getLocation();
throw new UnresolvableKeyException(sb);
} catch (JoseException | IOException e) {
StringBuilder sb = new StringBuilder();
sb.append("Unable to find a suitable verification key for JWS w/ header ").append(jws.getHeaders().getFullHeaderAsJsonString());
sb.append(" due to an unexpected exception (").append(e).append(") while obtaining or using keys from JWKS endpoint at ").append(
refreshingHttpsJwks.getLocation());
throw new UnresolvableKeyException(sb.toString(), e);
String sb = "Unable to find a suitable verification key for JWS w/ header " + jws.getHeaders().getFullHeaderAsJsonString() +
" due to an unexpected exception (" + e + ") while obtaining or using keys from JWKS endpoint at " +
refreshingHttpsJwks.getLocation();
throw new UnresolvableKeyException(sb, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public static OAuthBearerValidationResult validateScope(OAuthBearerToken token,
if (!tokenScope.contains(requiredScopeElement))
return OAuthBearerValidationResult.newFailure(String.format(
"The provided scope (%s) was missing a required scope (%s). All required scope elements: %s",
String.valueOf(tokenScope), requiredScopeElement, requiredScope),
tokenScope, requiredScopeElement, requiredScope),
requiredScope.toString(), null);
}
return OAuthBearerValidationResult.newSuccess();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,7 @@ public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
if (truststore != null && truststore.modified()) {
return true;
}
if (keystore != null && keystore.modified()) {
return true;
}
return false;
return keystore != null && keystore.modified();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private String escapeLiteralBackReferences(final String unescaped, final int num
final StringBuilder sb = new StringBuilder(value.length() + 1);
final int groupStart = backRefMatcher.start(1);

sb.append(value.substring(0, groupStart - 1));
sb.append(value, 0, groupStart - 1);
sb.append("\\");
sb.append(value.substring(groupStart - 1));
value = sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testParseAndValidateAddressesWithReverseLookup() {

// With lookup of example.com, either one or two addresses are expected depending on
// whether ipv4 and ipv6 are enabled
List<InetSocketAddress> validatedAddresses = checkWithLookup(asList("example.com:10000"));
List<InetSocketAddress> validatedAddresses = checkWithLookup(Collections.singletonList("example.com:10000"));
assertFalse(validatedAddresses.isEmpty(), "Unexpected addresses " + validatedAddresses);
List<String> validatedHostNames = validatedAddresses.stream().map(InetSocketAddress::getHostName)
.collect(Collectors.toList());
Expand Down
12 changes: 6 additions & 6 deletions clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1140,8 +1140,8 @@ public void testTopicMetadataOnUpdatePartitionLeadership() {
new Metadata.LeaderIdAndEpoch(
Optional.of(2),
Optional.of(3)
)),
Arrays.asList(node1)
)),
Collections.singletonList(node1)
);
assertEquals(2, metadata.fetch().partitionsForTopic(topic).size());
assertEquals(1, metadata.fetch().partition(tp0).leader().id());
Expand All @@ -1161,20 +1161,20 @@ public void testUpdatePartitionLeadership() {
// topic2 has 1 partition: tp21
String topic1 = "topic1";
TopicPartition tp11 = new TopicPartition(topic1, 0);
PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 2), Arrays.asList(3));
PartitionMetadata part1Metadata = new PartitionMetadata(Errors.NONE, tp11, Optional.of(1), Optional.of(100), Arrays.asList(1, 2), Arrays.asList(1, 2), Collections.singletonList(3));
Uuid topic1Id = Uuid.randomUuid();
TopicPartition tp12 = new TopicPartition(topic1, 1);
PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 3), Arrays.asList(1));
PartitionMetadata part12Metadata = new PartitionMetadata(Errors.NONE, tp12, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 3), Collections.singletonList(1));

String topic2 = "topic2";
TopicPartition tp21 = new TopicPartition(topic2, 0);
PartitionMetadata part2Metadata = new PartitionMetadata(Errors.NONE, tp21, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 3), Arrays.asList(1));
PartitionMetadata part2Metadata = new PartitionMetadata(Errors.NONE, tp21, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 3), Collections.singletonList(1));
Uuid topic2Id = Uuid.randomUuid();

Set<String> internalTopics = Collections.singleton(Topic.GROUP_METADATA_TOPIC_NAME);
TopicPartition internalPart = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0);
Uuid internalTopicId = Uuid.randomUuid();
PartitionMetadata internalTopicMetadata = new PartitionMetadata(Errors.NONE, internalPart, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 3), Arrays.asList(1));
PartitionMetadata internalTopicMetadata = new PartitionMetadata(Errors.NONE, internalPart, Optional.of(2), Optional.of(200), Arrays.asList(2, 3), Arrays.asList(2, 3), Collections.singletonList(1));

Map<String, Uuid> topicIds = new HashMap<>();
topicIds.put(topic1, topic1Id);
Expand Down
Loading