Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.ObjectTypeUtils;

import org.apache.tsfile.common.constant.TsFileConstant;

Expand All @@ -37,6 +38,9 @@ public class SnapshotFileSet {
TsFileResource.RESOURCE_SUFFIX.replace(".", ""),
ModificationFileV1.FILE_SUFFIX.replace(".", ""),
ModificationFile.FILE_SUFFIX.replace(".", ""),
ObjectTypeUtils.OBJECT_FILE_SUFFIX.replace(".", ""),
ObjectTypeUtils.OBJECT_TEMP_FILE_SUFFIX.replace(".", ""),
ObjectTypeUtils.OBJECT_BACK_FILE_SUFFIX.replace(".", ""),
};

private static final Set<String> DATA_FILE_SUFFIX_SET =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.utils.ObjectTypeUtils;

import org.apache.tsfile.external.commons.io.FileUtils;
import org.slf4j.Logger;
Expand All @@ -38,6 +40,8 @@
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -46,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

public class SnapshotLoader {
private Logger LOGGER = LoggerFactory.getLogger(SnapshotLoader.class);
Expand Down Expand Up @@ -231,6 +236,15 @@ private void deleteAllFilesInDataDirs() throws IOException {
timePartitions.addAll(Arrays.asList(files));
}
}

File objectRegionDir =
Paths.get(dataDirPath)
.resolve(IoTDBConstant.OBJECT_FOLDER_NAME)
.resolve(dataRegionId)
.toFile();
if (objectRegionDir.exists()) {
timePartitions.add(objectRegionDir);
}
}

try {
Expand Down Expand Up @@ -312,6 +326,78 @@ private void createLinksFromSnapshotDirToDataDirWithoutLog(File sourceDir)
createLinksFromSnapshotToSourceDir(targetSuffix, files, folderManager);
}
}

File snapshotObjectDir = new File(sourceDir, IoTDBConstant.OBJECT_FOLDER_NAME);
if (snapshotObjectDir.exists()) {
FolderManager objectFolderManager =
new FolderManager(
TierManager.getInstance().getAllObjectFileFolders(),
DirectoryStrategyType.SEQUENCE_STRATEGY);
linkObjectTreeFromSnapshotToObjectDirs(snapshotObjectDir, objectFolderManager);
}
}

private void linkObjectTreeFromSnapshotToObjectDirs(
File sourceObjectRoot, FolderManager folderManager)
throws DiskSpaceInsufficientException, IOException {
Path sourceRootPath = sourceObjectRoot.toPath();
// Process files during traversal to avoid loading all object file paths into memory.
Files.walkFileTree(
sourceRootPath,
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
if (!isObjectSnapshotCandidate(file.getFileName().toString())) {
return FileVisitResult.CONTINUE;
}
final Path sourceFile = file;
final Path targetRelPath = sourceRootPath.relativize(file);
try {
folderManager.getNextWithRetry(
currentObjectDir -> {
File targetFile =
new File(currentObjectDir).toPath().resolve(targetRelPath).toFile();
try {
if (!targetFile.getParentFile().exists()
&& !targetFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Cannot create directory %s",
targetFile.getParentFile().getAbsolutePath()));
}
try {
Files.createLink(targetFile.toPath(), sourceFile);
LOGGER.debug("Created hard link from {} to {}", sourceFile, targetFile);
return targetFile;
} catch (IOException e) {
LOGGER.info(
"Cannot create link from {} to {}, fallback to copy",
sourceFile,
targetFile);
}
Files.copy(sourceFile, targetFile.toPath());
return targetFile;
} catch (Exception e) {
LOGGER.warn(
"Failed to process file {} in dir {}: {}",
sourceFile.getFileName(),
currentObjectDir,
e.getMessage(),
e);
throw e;
}
});
} catch (Exception e) {
throw new IOException(
String.format(
"Failed to process object file after retries. Source: %s",
sourceFile.toAbsolutePath()),
e);
}
return FileVisitResult.CONTINUE;
}
});
}

private void createLinksFromSnapshotToSourceDir(
Expand Down Expand Up @@ -470,9 +556,105 @@ private int takeHardLinksFromSnapshotToDataDir(
}
}

File objectSnapshotRoot =
new File(
snapshotFolder.getAbsolutePath() + File.separator + IoTDBConstant.OBJECT_FOLDER_NAME);
if (objectSnapshotRoot.exists()) {
cnt += linkObjectSnapshotTreeToDataDir(objectSnapshotRoot, fileInfoSet);
}

return cnt;
}

private int linkObjectSnapshotTreeToDataDir(File objectSnapshotRoot, Set<String> fileInfoSet)
throws IOException {
final FolderManager folderManager;
try {
folderManager =
new FolderManager(
TierManager.getInstance().getAllObjectFileFolders(),
DirectoryStrategyType.SEQUENCE_STRATEGY);
} catch (DiskSpaceInsufficientException e) {
throw new IOException("Failed to initialize object folder manager", e);
}
Path rootPath = objectSnapshotRoot.toPath();
AtomicInteger cnt = new AtomicInteger(0);
// Process files during traversal to avoid loading all object file paths into memory.
Files.walkFileTree(
rootPath, new ObjectSnapshotLinkFileVisitor(rootPath, fileInfoSet, folderManager, cnt));

return cnt.get();
}

private final class ObjectSnapshotLinkFileVisitor extends SimpleFileVisitor<Path> {
private final Path rootPath;
private final Set<String> fileInfoSet;
private final FolderManager folderManager;
private final AtomicInteger cnt;

private ObjectSnapshotLinkFileVisitor(
Path rootPath, Set<String> fileInfoSet, FolderManager folderManager, AtomicInteger cnt) {
this.rootPath = rootPath;
this.fileInfoSet = fileInfoSet;
this.folderManager = folderManager;
this.cnt = cnt;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (!isObjectSnapshotCandidate(file.getFileName().toString())) {
return FileVisitResult.CONTINUE;
}
String infoStr = getFileInfoString(file.toFile());
if (!fileInfoSet.contains(infoStr)) {
throw new IOException(
String.format("File %s is not in the log file list", file.toAbsolutePath()));
}
final Path sourceFile = file;
final Path targetRelPath = rootPath.relativize(file);
try {
folderManager.getNextWithRetry(
currentObjectDir -> {
File targetFile = new File(currentObjectDir).toPath().resolve(targetRelPath).toFile();
try {
if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
throw new IOException(
String.format(
"Cannot create directory %s",
targetFile.getParentFile().getAbsolutePath()));
}
try {
Files.createLink(targetFile.toPath(), sourceFile);
LOGGER.debug("Created hard link from {} to {}", sourceFile, targetFile);
return targetFile;
} catch (IOException e) {
LOGGER.info(
"Cannot create link from {} to {}, fallback to copy", sourceFile, targetFile);
}
Files.copy(sourceFile, targetFile.toPath());
return targetFile;
} catch (Exception e) {
LOGGER.warn(
"Failed to process file {} in dir {}: {}",
sourceFile.getFileName(),
currentObjectDir,
e.getMessage(),
e);
throw e;
}
});
} catch (Exception e) {
throw new IOException(
String.format(
"Failed to process object snapshot file after retries. Source: %s",
sourceFile.toAbsolutePath()),
e);
}
cnt.incrementAndGet();
return FileVisitResult.CONTINUE;
}
}

private void createLinksFromSourceToTarget(File targetDir, File[] files, Set<String> fileInfoSet)
throws IOException {
for (File file : files) {
Expand All @@ -492,6 +674,33 @@ private void createLinksFromSourceToTarget(File targetDir, File[] files, Set<Str
}

private String getFileInfoString(File file) {
Path filePath = file.toPath();
int objectDirIndex = -1;
int nameCount = filePath.getNameCount();
for (int i = 0; i < nameCount; i++) {
if (IoTDBConstant.OBJECT_FOLDER_NAME.equals(filePath.getName(i).toString())) {
objectDirIndex = i;
break;
}
}
if (objectDirIndex >= 0 && objectDirIndex < nameCount - 1) {
Path relativeToObject = filePath.subpath(objectDirIndex + 1, nameCount);
String fileName = relativeToObject.getFileName().toString();
Path parentPath = relativeToObject.getParent();
String middlePath = "";
if (parentPath != null) {
List<String> pathElements = new ArrayList<>();
for (Path element : parentPath) {
pathElements.add(element.toString());
}
middlePath = String.join("/", pathElements);
}
return fileName
+ SnapshotLogger.SPLIT_CHAR
+ middlePath
+ SnapshotLogger.SPLIT_CHAR
+ "object";
}
String[] splittedStr = file.getAbsolutePath().split(File.separator.equals("\\") ? "\\\\" : "/");
int length = splittedStr.length;
return splittedStr[length - SnapshotLogger.FILE_NAME_OFFSET]
Expand All @@ -501,6 +710,12 @@ private String getFileInfoString(File file) {
+ splittedStr[length - SnapshotLogger.SEQUENCE_OFFSET];
}

private boolean isObjectSnapshotCandidate(String fileName) {
return fileName.endsWith(ObjectTypeUtils.OBJECT_FILE_SUFFIX)
|| fileName.endsWith(ObjectTypeUtils.OBJECT_TEMP_FILE_SUFFIX)
|| fileName.endsWith(ObjectTypeUtils.OBJECT_BACK_FILE_SUFFIX);
}

public List<File> getSnapshotFileInfo() throws IOException {
File snapshotLogFile = getSnapshotLogFile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.snapshot;

import org.apache.iotdb.commons.conf.IoTDBConstant;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;

public class SnapshotLogger implements AutoCloseable {
public static final String SNAPSHOT_LOG_NAME = "snapshot.log";
Expand Down Expand Up @@ -56,6 +61,26 @@ public void close() throws Exception {
os.close();
}

public void logObjectRelativePath(Path relativePathFromObjectRoot) throws IOException {
String fileName = relativePathFromObjectRoot.getFileName().toString();
Path parentPath = relativePathFromObjectRoot.getParent();
String middlePath = "";
if (parentPath != null) {
List<String> pathElements = new ArrayList<>();
for (Path element : parentPath) {
pathElements.add(element.toString());
}
middlePath = String.join("/", pathElements);
}
os.write(fileName.getBytes(StandardCharsets.UTF_8));
os.write(SPLIT_CHAR.getBytes(StandardCharsets.UTF_8));
os.write(middlePath.getBytes(StandardCharsets.UTF_8));
os.write(SPLIT_CHAR.getBytes(StandardCharsets.UTF_8));
os.write(IoTDBConstant.OBJECT_FOLDER_NAME.getBytes(StandardCharsets.UTF_8));
os.write("\n".getBytes(StandardCharsets.UTF_8));
os.flush();
}

/**
* Log the logical info for the link file, including its file name, time partition, data region
* id, database name, sequence or not.
Expand Down
Loading