Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,20 @@ private void commitUnPartitionedTable() {
}

private void commitPartitionedTable() {
if (!rewritePartitions.isEmpty()) {
LOG.info(
"Altering {} Hive partitions for table {}, partitions {}",
rewritePartitions.size(),
table.id(),
partitionsSummary(rewritePartitions.values()));
}
if (!newPartitions.isEmpty()) {
LOG.info(
"Creating {} Hive partitions for table {}, partitions {}",
newPartitions.size(),
table.id(),
partitionsSummary(newPartitions.values()));
}
try {
transactionalHMSClient.run(
c -> {
Expand All @@ -346,10 +360,30 @@ private void commitPartitionedTable() {
return 0;
});
} catch (TException | InterruptedException e) {
LOG.warn(
"Failed to commit Hive partition operations for table {}: alter {}, create {}",
table.id(),
rewritePartitions.size(),
newPartitions.size(),
e);
throw new RuntimeException(e);
}
}

private static String partitionsSummary(java.util.Collection<Partition> partitions) {
int max = 5;
return partitions.stream()
.limit(max)
.map(
p ->
"Partition(values: ["
+ Joiner.on("/").join(p.getValues())
+ "], location: "
+ p.getSd().getLocation()
+ ")")
.collect(Collectors.joining(", ", "[", partitions.size() > max ? ", ...]" : "]"));
}

private void checkDataFileInSameLocation(String partitionLocation, List<DataFile> files) {
Path partitionPath = new Path(partitionLocation);
for (DataFile df : files) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void commit() {
commitPartitionedTable();
}
} catch (Exception e) {
LOG.warn("Commit operation to HMS failed.", e);
LOG.warn("Commit Hive partition operations to HMS failed for table {}.", table.id(), e);
}
}

Expand Down Expand Up @@ -455,6 +455,12 @@ private StructLikeMap<Partition> filterNewPartitionNonExists(

private void commitPartitionedTable() {
if (!partitionToDelete.isEmpty()) {
LOG.info(
"Dropping {} Hive partitions for table {}, txId {}, partitions {}",
partitionToDelete.size(),
table.id(),
txId,
partitionsSummary(partitionToDelete.values()));
for (Partition p : partitionToDelete.values()) {
try {
transactionClient.run(
Expand All @@ -469,22 +475,48 @@ private void commitPartitionedTable() {
return 0;
});
} catch (NoSuchObjectException e) {
LOG.warn("try to delete hive partition {} but partition not exist.", p);
LOG.warn(
"Tried to drop Hive partition for table {} but partition does not exist: {}",
table.id(),
partitionToString(p));
} catch (TException | InterruptedException e) {
LOG.warn(
"Failed to drop Hive partition for table {}, partition {}",
table.id(),
partitionToString(p),
e);
throw new RuntimeException(e);
}
}
}

if (!partitionToCreate.isEmpty()) {
LOG.info(
"Creating {} Hive partitions for table {}, txId {}, partitions {}",
partitionToCreate.size(),
table.id(),
txId,
partitionsSummary(partitionToCreate.values()));
try {
transactionClient.run(c -> c.addPartitions(Lists.newArrayList(partitionToCreate.values())));
} catch (TException | InterruptedException e) {
LOG.warn(
"Failed to create {} Hive partitions for table {}, partitions {}",
partitionToCreate.size(),
table.id(),
partitionsSummary(partitionToCreate.values()),
e);
throw new RuntimeException(e);
}
}

if (!partitionToAlter.isEmpty()) {
LOG.info(
"Altering {} Hive partitions for table {}, txId {}, partitions {}",
partitionToAlter.size(),
table.id(),
txId,
partitionsSummary(partitionToAlter.values()));
try {
transactionClient.run(
c -> {
Expand All @@ -501,11 +533,25 @@ private void commitPartitionedTable() {
return null;
});
} catch (TException | InterruptedException e) {
LOG.warn(
"Failed to alter {} Hive partitions for table {}, partitions {}",
partitionToAlter.size(),
table.id(),
partitionsSummary(partitionToAlter.values()),
e);
throw new RuntimeException(e);
}
}
}

private static String partitionsSummary(java.util.Collection<Partition> partitions) {
int max = 5;
return partitions.stream()
.limit(max)
.map(UpdateHiveFiles::partitionToString)
.collect(Collectors.joining(", ", "[", partitions.size() > max ? ", ...]" : "]"));
}

private void generateUnpartitionTableLocation() {
if (this.addFiles.isEmpty()) {
unpartitionTableLocation = createUnpartitionEmptyLocationForHive();
Expand Down Expand Up @@ -611,7 +657,7 @@ private boolean isPathEquals(String pathA, String pathB) {
return path1.equals(path2);
}

private String partitionToString(Partition p) {
private static String partitionToString(Partition p) {
return "Partition(values: ["
+ Joiner.on("/").join(p.getValues())
+ "], location: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,20 @@ public static List<String> getHivePartitionLocations(
public static void alterPartition(
HMSClientPool hiveClient, TableIdentifier tableIdentifier, String partition, String newPath)
throws IOException {
String oldLocation = null;
try {
LOG.info(
"alter table {} hive partition {} to new location {}",
tableIdentifier,
partition,
newPath);
Partition oldPartition =
hiveClient.run(
client ->
client.getPartition(
tableIdentifier.getDatabase(), tableIdentifier.getTableName(), partition));
oldLocation = oldPartition.getSd().getLocation();
LOG.info(
"Altering Hive partition location for table {}, partition {}, location {} -> {}",
tableIdentifier,
partition,
oldLocation,
newPath);
Partition newPartition = new Partition(oldPartition);
newPartition.getSd().setLocation(newPath);
hiveClient.run(
Expand All @@ -255,7 +258,20 @@ public static void alterPartition(
}
return null;
});
LOG.info(
"Altered Hive partition location for table {}, partition {}, location {} -> {}",
tableIdentifier,
partition,
oldLocation,
newPath);
} catch (Exception e) {
LOG.warn(
"Failed to alter Hive partition location for table {}, partition {}, location {} -> {}",
tableIdentifier,
partition,
oldLocation,
newPath,
e);
throw new IOException(e);
}
}
Expand All @@ -282,18 +298,40 @@ public static void createPartitionIfAbsent(
partition =
newPartition(
hiveTable, partitionValues, partitionLocation, dataFiles, accessTimestamp);
LOG.info(
"Creating Hive partition for table {}, partition values {}, location {}",
mixedTable.id(),
partitionValues,
partitionLocation);
client.addPartition(partition);
LOG.info(
"Created Hive partition for table {}, partition values {}, location {}",
mixedTable.id(),
partitionValues,
partitionLocation);
return partition;
}
});
} catch (Exception e) {
LOG.warn(
"Failed to create Hive partition for table {}, partition values {}, location {}",
mixedTable.id(),
partitionValues,
partitionLocation,
e);
throw new RuntimeException(e);
}
}

public static void dropPartition(
HMSClientPool hmsClient, MixedTable mixedTable, Partition hivePartition) {
String location = hivePartition.getSd() == null ? null : hivePartition.getSd().getLocation();
try {
LOG.info(
"Dropping Hive partition for table {}, partition values {}, location {}",
mixedTable.id(),
hivePartition.getValues(),
location);
hmsClient.run(
client -> {
PartitionDropOptions options =
Expand All @@ -308,7 +346,18 @@ public static void dropPartition(
hivePartition.getValues(),
options);
});
LOG.info(
"Dropped Hive partition for table {}, partition values {}, location {}",
mixedTable.id(),
hivePartition.getValues(),
location);
} catch (TException | InterruptedException e) {
LOG.warn(
"Failed to drop Hive partition for table {}, partition values {}, location {}",
mixedTable.id(),
hivePartition.getValues(),
location,
e);
throw new RuntimeException(e);
}
}
Expand All @@ -320,6 +369,13 @@ public static void updatePartitionLocation(
String newLocation,
List<DataFile> dataFiles,
int accessTimestamp) {
String oldLocation = hivePartition.getSd() == null ? null : hivePartition.getSd().getLocation();
LOG.info(
"Updating Hive partition location for table {}, partition values {}, location {} -> {}",
mixedTable.id(),
hivePartition.getValues(),
oldLocation,
newLocation);
dropPartition(hmsClient, mixedTable, hivePartition);
createPartitionIfAbsent(
hmsClient, mixedTable, hivePartition.getValues(), newLocation, dataFiles, accessTimestamp);
Expand Down
Loading