Skip to content

HBASE-28951 use unique name of temp recovered edits for each worker #7075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
/** Returns a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. */
protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
long seqId) throws IOException {
// If multiple worker are splitting a WAL at a same time, both should use unique file name to
// avoid conflict
long workerStartCode =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is startCode enough to make unique directory? Different region server could have same start code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I missed this point, this won't work.
To use the whole RS server name, I was worried about reaching the filename lenght limit. Do you also see this concern on it?

walSplitter.rsServices != null ? walSplitter.rsServices.getServerName().getStartCode() : 0L;
Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
walSplitter.conf);
walSplitter.conf, workerStartCode);

if (walSplitter.walFS.exists(regionEditsPath)) {
LOG.warn("Found old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length="
Expand Down Expand Up @@ -111,6 +116,11 @@ protected Path closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter ed
// TestHLogSplit#testThreading is an example.
if (walSplitter.walFS.exists(editsWriter.path)) {
if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
// We only rename editsWriter if dst does not exist, still if rename fails and dst exist
// with equal or more entries, we can delete the editsWriter file.
// It happens if two RS was splitting the same WAL and both tried to rename at the same
// time. See HBASE-28951 for more details.
if (deleteTmpIfDstHasNoLessEntries(editsWriter, dst)) return dst;
final String errorMsg =
"Failed renaming recovered edits " + editsWriter.path + " to " + dst;
updateStatusWithMsg(errorMsg);
Expand All @@ -130,6 +140,18 @@ protected Path closeRecoveredEditsWriterAndFinalizeEdits(RecoveredEditsWriter ed
return dst;
}

private boolean deleteTmpIfDstHasNoLessEntries(RecoveredEditsWriter editsWriter, Path dst)
throws IOException {
if (walSplitter.walFS.exists(dst) && !isDstHasFewerEntries(editsWriter, dst)) {
if (!walSplitter.walFS.delete(editsWriter.path, false)) {
LOG.warn("Failed deleting of {}", editsWriter.path);
throw new IOException("Failed deleting of " + editsWriter.path);
}
return true;
}
return false;
}

private boolean closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
List<IOException> thrown) {
try {
Expand Down Expand Up @@ -188,18 +210,7 @@ void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
// delete the one with fewer wal entries
private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
throws IOException {
long dstMinLogSeqNum = -1L;
try (WALStreamReader reader =
walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId();
}
} catch (EOFException e) {
LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
e);
}
if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
if (isDstHasFewerEntries(editsWriter, dst)) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ walSplitter.walFS.getFileStatus(dst).getLen());
Expand All @@ -218,6 +229,22 @@ private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path ds
}
}

private boolean isDstHasFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
throws IOException {
long dstMinLogSeqNum = -1L;
try (WALStreamReader reader =
walSplitter.getWalFactory().createStreamReader(walSplitter.walFS, dst)) {
WAL.Entry entry = reader.next();
if (entry != null) {
dstMinLogSeqNum = entry.getKey().getSequenceId();
}
} catch (EOFException e) {
LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
e);
}
return editsWriter.minLogSeqNum < dstMinLogSeqNum;
}

/**
* Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting
* statistics about the data written to this output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ public static void moveWAL(FileSystem fs, Path p, Path targetDir) throws IOExcep
*/
@SuppressWarnings("deprecation")
static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long seqId,
String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
String fileNameBeingSplit, String tmpDirName, Configuration conf, long workerStartCode)
throws IOException {
FileSystem walFS = CommonFSUtils.getWALFileSystem(conf);
Path tableDir = CommonFSUtils.getWALTableDir(conf, tableName);
String encodedRegionNameStr = Bytes.toString(encodedRegionName);
Expand Down Expand Up @@ -193,7 +194,8 @@ static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionNam
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
// region's replayRecoveredEdits will not delete it
String fileName = formatRecoveredEditsFileName(seqId);
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
fileName =
getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit + "-" + workerStartCode);
return new Path(dir, fileName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public void testOldRecoveredEditsFileSidelined() throws IOException {
private Path createRecoveredEditsPathForRegion() throws IOException {
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
FILENAME_BEING_SPLIT, TMPDIRNAME, conf, 0L);
return p;
}

Expand Down