Skip to content

Commit da08818

Browse files
committed
BOOKKEEPER-62: Bookie can not start when encountering corrupted records (breed via ivank)
git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1207997 13f79535-47bb-0310-9956-ffa450edef68
1 parent 5048677 commit da08818

File tree

3 files changed

+124
-27
lines changed

3 files changed

+124
-27
lines changed

CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ BUGFIXES:
8686

8787
BOOKKEEPER-125: log4j still used in some places (ivank)
8888

89+
BOOKKEEPER-62: Bookie can not start when encountering corrupted records (breed via ivank)
90+
8991
hedwig-server/
9092

9193
BOOKKEEPER-43: NullPointException when releasing topic (Sijie Guo via breed)

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -411,34 +411,39 @@ private void extractLedgersFromEntryLogs() throws IOException {
411411
long pos = LOGFILE_HEADER_SIZE;
412412
ConcurrentHashMap<Long, Boolean> entryLogLedgers = new ConcurrentHashMap<Long, Boolean>();
413413
// Read through the entry log file and extract the ledger ID's.
414-
while (true) {
415-
// Check if we've finished reading the entry log file.
416-
if (pos >= bc.size()) {
417-
break;
418-
}
419-
if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
420-
throw new IOException("Short read from entrylog " + entryLogId);
421-
}
422-
pos += 4;
423-
sizeBuff.flip();
424-
int entrySize = sizeBuff.getInt();
425-
if (entrySize > 1024 * 1024) {
426-
LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in "
427-
+ entryLogId);
428-
}
429-
byte data[] = new byte[entrySize];
430-
ByteBuffer buff = ByteBuffer.wrap(data);
431-
int rc = bc.read(buff, pos);
432-
if (rc != data.length) {
433-
throw new IOException("Short read for entryLog " + entryLogId + "@" + pos + "(" + rc + "!="
434-
+ data.length + ")");
414+
try {
415+
while (true) {
416+
// Check if we've finished reading the entry log file.
417+
if (pos >= bc.size()) {
418+
break;
419+
}
420+
if (bc.read(sizeBuff, pos) != sizeBuff.capacity()) {
421+
throw new IOException("Short read from entrylog " + entryLogId);
422+
}
423+
pos += 4;
424+
sizeBuff.flip();
425+
int entrySize = sizeBuff.getInt();
426+
if (entrySize > 1024 * 1024) {
427+
LOG.error("Sanity check failed for entry size of " + entrySize + " at location " + pos + " in "
428+
+ entryLogId);
429+
}
430+
byte data[] = new byte[entrySize];
431+
ByteBuffer buff = ByteBuffer.wrap(data);
432+
int rc = bc.read(buff, pos);
433+
if (rc != data.length) {
434+
throw new IOException("Short read for entryLog " + entryLogId + "@" + pos + "(" + rc + "!="
435+
+ data.length + ")");
436+
}
437+
buff.flip();
438+
long ledgerId = buff.getLong();
439+
entryLogLedgers.put(ledgerId, true);
440+
// Advance position to the next entry and clear sizeBuff.
441+
pos += entrySize;
442+
sizeBuff.clear();
435443
}
436-
buff.flip();
437-
long ledgerId = buff.getLong();
438-
entryLogLedgers.put(ledgerId, true);
439-
// Advance position to the next entry and clear sizeBuff.
440-
pos += entrySize;
441-
sizeBuff.clear();
444+
} catch(IOException e) {
445+
LOG.info("Premature exception when processing " + entryLogId +
446+
"recovery will take care of the problem", e);
442447
}
443448
LOG.info("Retrieved all ledgers that comprise entryLogId: " + entryLogId + ", values: " + entryLogLedgers);
444449
entryLogs2LedgersMap.put(entryLogId, entryLogLedgers);
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
*
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
*
20+
*/
21+
package org.apache.bookkeeper.bookie;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.io.RandomAccessFile;
26+
import java.lang.reflect.Field;
27+
import java.nio.ByteBuffer;
28+
import java.util.Map;
29+
30+
import junit.framework.TestCase;
31+
32+
import org.apache.bookkeeper.conf.ServerConfiguration;
33+
import org.junit.After;
34+
import org.junit.Before;
35+
import org.junit.Test;
36+
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
public class EntryLogTest extends TestCase {
41+
static Logger LOG = LoggerFactory.getLogger(EntryLogTest.class);
42+
43+
@Before
44+
public void setUp() throws Exception {
45+
}
46+
47+
@Test
48+
public void testCorruptEntryLog() throws IOException, SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
49+
File tmpDir = File.createTempFile("bkTest", ".dir");
50+
tmpDir.delete();
51+
tmpDir.mkdir();
52+
ServerConfiguration conf = new ServerConfiguration();
53+
conf.setLedgerDirNames(new String[] {tmpDir.toString()});
54+
// create some entries
55+
EntryLogger logger = new EntryLogger(conf, null);
56+
logger.addEntry(1, generateEntry(1, 1));
57+
logger.addEntry(3, generateEntry(3, 1));
58+
logger.addEntry(2, generateEntry(2, 1));
59+
logger.flush();
60+
// now lets truncate the file to corrupt the last entry, which simulates a partial write
61+
File f = new File(tmpDir, "0.log");
62+
RandomAccessFile raf = new RandomAccessFile(f, "rw");
63+
raf.setLength(raf.length()-10);
64+
raf.close();
65+
// now see which ledgers are in the log
66+
logger = new EntryLogger(conf, null);
67+
Field entryLogs2LedgersMapField = logger.getClass().getDeclaredField("entryLogs2LedgersMap");
68+
entryLogs2LedgersMapField.setAccessible(true);
69+
@SuppressWarnings("unchecked")
70+
Map<Long, Map<Long, Boolean>> ledgersMap = (Map<Long, Map<Long, Boolean>>) entryLogs2LedgersMapField.get(logger);
71+
LOG.info("LedgersMap.get(0) {}", ledgersMap.get(0L));
72+
assertNotNull(ledgersMap.get(0L).get(1L));
73+
assertNull(ledgersMap.get(0L).get(2L));
74+
assertNotNull(ledgersMap.get(0L).get(3L));
75+
}
76+
77+
private ByteBuffer generateEntry(long ledger, long entry) {
78+
ByteBuffer bb = ByteBuffer.wrap(new byte[64]);
79+
bb.putLong(ledger);
80+
bb.putLong(entry);
81+
bb.put(("ledger"+ledger).getBytes());
82+
bb.flip();
83+
return bb;
84+
}
85+
86+
@After
87+
public void tearDown() throws Exception {
88+
}
89+
90+
}

0 commit comments

Comments
 (0)