Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ protected TCompressedElasticFramedTransport(
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
checkFrameSize(size);
validateFrame(size);
readBuffer.fill(underlying, size);
RpcStat.readCompressedBytes.addAndGet(size);
try {
Expand All @@ -69,6 +69,7 @@ public void flush() throws TTransportException {
writeCompressBuffer.resizeIfNecessary(maxCompressedLength);
int compressedLength =
compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0);
checkWriteFrameSize(compressedLength);
RpcStat.writeCompressedBytes.addAndGet(compressedLength);
TFramedTransport.encodeFrameSize(compressedLength, i32buf);
underlying.write(i32buf, 0, 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ public int read(byte[] buf, int off, int len) throws TTransportException {
protected void readFrame() throws TTransportException {
underlying.readAll(i32buf, 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
checkFrameSize(size);
validateFrame(size);
readBuffer.fill(underlying, size);
}

protected void checkFrameSize(int size) throws TTransportException {
protected void validateFrame(int size) throws TTransportException {
final int HTTP_GET_SIGNATURE = 0x47455420; // "GET "
final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST"
final int TLS_MIN_VERSION = 0x160300;
Expand All @@ -196,8 +196,6 @@ protected void checkFrameSize(int size) throws TTransportException {
error = FrameError.TLS_REQUEST;
} else if (size < 0) {
error = FrameError.NEGATIVE_FRAME_SIZE;
} else if (size > thriftMaxFrameSize) {
error = FrameError.FRAME_SIZE_EXCEEDED;
}
}

Expand Down Expand Up @@ -241,9 +239,26 @@ void throwException(int size, String remoteInfo, int maxSize) throws TTransportE
}
}

protected void checkWriteFrameSize(int size) throws TTransportException {
if (size <= thriftMaxFrameSize) {
return;
}
SocketAddress remoteAddress = null;
if (underlying instanceof TSocket) {
remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress();
}
String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress;
String message =
String.format(
FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size, thriftMaxFrameSize, remoteInfo);
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA, message);
}

@Override
public void flush() throws TTransportException {
int length = writeBuffer.getPos();
checkWriteFrameSize(length);
TFramedTransport.encodeFrameSize(length, i32buf);
underlying.write(i32buf, 0, 4);
underlying.write(writeBuffer.getBuffer(), 0, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public enum TSStatusCode {
INTERNAL_SERVER_ERROR(305),
DISPATCH_ERROR(306),
LICENSE_ERROR(307),
THRIFT_FRAME_OVERSIZE(308),

// Client,
REDIRECTION_RECOMMEND(400),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public enum OperationType {
DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"),
GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"),
GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"),
CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus");
CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"),
DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance");
private final String name;

OperationType(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.Preconditions;
Expand All @@ -77,7 +78,10 @@
import java.util.stream.Collectors;

import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE;
import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.rpc.TSStatusCode.THRIFT_FRAME_OVERSIZE;

public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {

Expand Down Expand Up @@ -459,7 +463,7 @@
return this.localhostIpAddr.equals(endPoint.getIp()) && localhostInternalPort == endPoint.port;
}

private void dispatchRemoteHelper(final FragmentInstance instance, final TEndPoint endPoint)

Check warning on line 466 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 93 to 64, Complexity from 20 to 14, Nesting Level from 5 to 2, Number of Variables from 10 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2vxNiaqg995FU64HAN&open=AZ2vxNiaqg995FU64HAN&pullRequest=17536
throws FragmentInstanceDispatchException,
TException,
ClientManagerException,
Expand Down Expand Up @@ -549,6 +553,17 @@
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format("unknown read type [%s]", instance.getType())));
}
} catch (TException e) {
Throwable rootCause = ExceptionUtils.getRootCause(e);
if (rootCause instanceof TTransportException
&& ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) {
// Don't set DISPATCH_ERROR status to avoid retry if dispatch failed because of thrift frame
// is oversize
throw new FragmentInstanceDispatchException(
onNpeOrUnexpectedException(
rootCause, DISPATCH_FRAGMENT_INSTANCE, THRIFT_FRAME_OVERSIZE));
}
throw e;
}
}

Expand Down Expand Up @@ -591,7 +606,7 @@
}
}

private void dispatchLocally(final FragmentInstance instance)

Check warning on line 609 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 80 to 64, Complexity from 18 to 14, Nesting Level from 4 to 2, Number of Variables from 9 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ2vxNiaqg995FU64HAO&open=AZ2vxNiaqg995FU64HAO&pullRequest=17536
throws FragmentInstanceDispatchException {
// deserialize ConsensusGroupId
ConsensusGroupId groupId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private ErrorHandlingUtils() {}
private static final String ERROR_OPERATION_LOG = "Status code: {}, operation: {} failed";

public static TSStatus onNpeOrUnexpectedException(
Exception e, String operation, TSStatusCode statusCode) {
Throwable e, String operation, TSStatusCode statusCode) {
String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation);
if (e instanceof IOException || e instanceof NullPointerException) {
LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e);
Expand All @@ -84,7 +84,7 @@ public static TSStatus onNpeOrUnexpectedException(
}

public static TSStatus onNpeOrUnexpectedException(
Exception e, OperationType operation, TSStatusCode statusCode) {
Throwable e, OperationType operation, TSStatusCode statusCode) {
return onNpeOrUnexpectedException(e, operation.getName(), statusCode);
}

Expand Down
Loading