diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index 5b9c81ec58b9..62abc28e4702 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -44,7 +44,7 @@ protected TCompressedElasticFramedTransport( protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); RpcStat.readCompressedBytes.addAndGet(size); try { @@ -69,6 +69,7 @@ public void flush() throws TTransportException { writeCompressBuffer.resizeIfNecessary(maxCompressedLength); int compressedLength = compress(writeBuffer.getBuffer(), 0, length, writeCompressBuffer.getBuffer(), 0); + checkWriteFrameSize(compressedLength); RpcStat.writeCompressedBytes.addAndGet(compressedLength); TFramedTransport.encodeFrameSize(compressedLength, i32buf); underlying.write(i32buf, 0, 4); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index 31e0f0b69606..f1d46dbb4344 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -174,11 +174,11 @@ public int read(byte[] buf, int off, int len) throws TTransportException { protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); - checkFrameSize(size); + validateFrame(size); readBuffer.fill(underlying, size); } - protected void checkFrameSize(int size) throws TTransportException { + protected void validateFrame(int size) throws TTransportException { final int HTTP_GET_SIGNATURE = 0x47455420; // "GET " final int HTTP_POST_SIGNATURE = 0x504F5354; // "POST" final int TLS_MIN_VERSION = 0x160300; @@ -196,8 +196,6 @@ protected void checkFrameSize(int size) throws TTransportException { error = FrameError.TLS_REQUEST; } else if (size < 0) { error = FrameError.NEGATIVE_FRAME_SIZE; - } else if (size > thriftMaxFrameSize) { - error = FrameError.FRAME_SIZE_EXCEEDED; } } @@ -241,9 +239,26 @@ void throwException(int size, String remoteInfo, int maxSize) throws TTransportE } } + protected void checkWriteFrameSize(int size) throws TTransportException { + if (size <= thriftMaxFrameSize) { + return; + } + SocketAddress remoteAddress = null; + if (underlying instanceof TSocket) { + remoteAddress = ((TSocket) underlying).getSocket().getRemoteSocketAddress(); + } + String remoteInfo = (remoteAddress == null) ? "" : " to " + remoteAddress; + String message = + String.format( + FrameError.FRAME_SIZE_EXCEEDED.messageFormat, size, thriftMaxFrameSize, remoteInfo); + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, message); + } + @Override public void flush() throws TTransportException { int length = writeBuffer.getPos(); + checkWriteFrameSize(length); TFramedTransport.encodeFrameSize(length, i32buf); underlying.write(i32buf, 0, 4); underlying.write(writeBuffer.getBuffer(), 0, length); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index f9e750f4012b..9fb0bb2f4649 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -51,6 +51,7 @@ public enum TSStatusCode { INTERNAL_SERVER_ERROR(305), DISPATCH_ERROR(306), LICENSE_ERROR(307), + THRIFT_FRAME_OVERSIZE(308), // Client, REDIRECTION_RECOMMEND(400), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java index 881e823ef2d6..94b5dcadecbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/OperationType.java @@ -58,7 +58,8 @@ public enum OperationType { DEALLOCATE_PREPARED_STATEMENT("deallocatePreparedStatement"), GET_EARLIEST_TIMESLOTS("getEarliestTimeslots"), GENERATE_DATA_PARTITION_TABLE("generateDataPartitionTable"), - CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"); + CHECK_DATA_PARTITION_TABLE_STATUS("checkDataPartitionTableStatus"), + DISPATCH_FRAGMENT_INSTANCE("dispatchFragmentInstance"); private final String name; OperationType(String name) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java index c52f8f94eb2a..6000a2b8e293 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java @@ -59,6 +59,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; import org.apache.tsfile.external.commons.lang3.exception.ExceptionUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.Preconditions; @@ -77,7 +78,10 @@ import java.util.stream.Collectors; import static com.google.common.util.concurrent.Futures.immediateFuture; +import static org.apache.iotdb.db.protocol.thrift.OperationType.DISPATCH_FRAGMENT_INSTANCE; import static org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; +import static org.apache.iotdb.rpc.TSStatusCode.THRIFT_FRAME_OVERSIZE; public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher { @@ -549,6 +553,17 @@ private void dispatchRemoteHelper(final FragmentInstance instance, final TEndPoi TSStatusCode.EXECUTE_STATEMENT_ERROR, String.format("unknown read type [%s]", instance.getType()))); } + } catch (TException e) { + Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof TTransportException + && ((TTransportException) rootCause).getType() == TTransportException.CORRUPTED_DATA) { + // Don't set DISPATCH_ERROR status to avoid retry if dispatch failed because of thrift frame + // is oversize + throw new FragmentInstanceDispatchException( + onNpeOrUnexpectedException( + rootCause, DISPATCH_FRAGMENT_INSTANCE, THRIFT_FRAME_OVERSIZE)); + } + throw e; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java index 34d2e9bf82cb..df24a0187af2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java @@ -63,7 +63,7 @@ private ErrorHandlingUtils() {} private static final String ERROR_OPERATION_LOG = "Status code: {}, operation: {} failed"; public static TSStatus onNpeOrUnexpectedException( - Exception e, String operation, TSStatusCode statusCode) { + Throwable e, String operation, TSStatusCode statusCode) { String message = String.format("[%s] Exception occurred: %s failed. ", statusCode, operation); if (e instanceof IOException || e instanceof NullPointerException) { LOGGER.error(ERROR_OPERATION_LOG, statusCode, operation, e); @@ -84,7 +84,7 @@ public static TSStatus onNpeOrUnexpectedException( } public static TSStatus onNpeOrUnexpectedException( - Exception e, OperationType operation, TSStatusCode statusCode) { + Throwable e, OperationType operation, TSStatusCode statusCode) { return onNpeOrUnexpectedException(e, operation.getName(), statusCode); }