Skip to content

Commit 7ecebee

Browse files
hingu-8103wchevreuil
authored andcommitted
HBASE-29569: Implement a built-in TieringValueProvider for parsing the date value from the rowkey (#7593)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
1 parent 92ad502 commit 7ecebee

3 files changed

Lines changed: 501 additions & 0 deletions

File tree

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.compactions;
19+
20+
import java.text.ParseException;
21+
import java.text.SimpleDateFormat;
22+
import java.util.regex.Matcher;
23+
import java.util.regex.Pattern;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hbase.ExtendedCell;
26+
import org.apache.hadoop.hbase.util.Bytes;
27+
import org.apache.yetus.audience.InterfaceAudience;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Provides a tiering value for compactions by extracting and parsing a date from the row key. This
33+
* implementation uses a configurable regex and date format to locate and parse a date substring
34+
* from the row key and returns the parsed epoch time in milliseconds. Configuration properties can
35+
* be set at globally or at table level.
36+
*/
37+
@InterfaceAudience.Private
38+
public class RowKeyDateTieringValueProvider implements CustomTieredCompactor.TieringValueProvider {
39+
private static final Logger LOG = LoggerFactory.getLogger(RowKeyDateTieringValueProvider.class);
40+
public static final String TIERING_KEY_DATE_PATTERN = "TIERING_KEY_DATE_PATTERN";
41+
public static final String TIERING_KEY_DATE_FORMAT = "TIERING_KEY_DATE_FORMAT";
42+
public static final String TIERING_KEY_DATE_GROUP = "TIERING_KEY_DATE_GROUP";
43+
private Pattern rowKeyPattern;
44+
private SimpleDateFormat dateFormat;
45+
private Integer rowKeyRegexExtractGroup;
46+
47+
@Override
48+
public void init(Configuration conf) throws Exception {
49+
// Initialize regex pattern
50+
String regexPatternStr = conf.get(TIERING_KEY_DATE_PATTERN);
51+
if (regexPatternStr == null || regexPatternStr.isEmpty()) {
52+
throw new IllegalArgumentException(
53+
"Configuration property '" + TIERING_KEY_DATE_PATTERN + "' is required");
54+
}
55+
rowKeyPattern = Pattern.compile(regexPatternStr);
56+
57+
// Initialize date format
58+
String dateFormatStr = conf.get(TIERING_KEY_DATE_FORMAT);
59+
if (dateFormatStr == null || dateFormatStr.isEmpty()) {
60+
throw new IllegalArgumentException(
61+
"Configuration property '" + TIERING_KEY_DATE_FORMAT + "' is required");
62+
}
63+
try {
64+
dateFormat = new SimpleDateFormat(dateFormatStr);
65+
dateFormat.setLenient(false);
66+
} catch (Exception e) {
67+
throw new IllegalArgumentException("Invalid date format for Configuration property '"
68+
+ TIERING_KEY_DATE_FORMAT + "': " + dateFormatStr, e);
69+
}
70+
71+
// Initialize regex extract group
72+
String extractGroupStr = conf.get(TIERING_KEY_DATE_GROUP, "0");
73+
try {
74+
rowKeyRegexExtractGroup = Integer.parseInt(extractGroupStr);
75+
} catch (NumberFormatException e) {
76+
throw new IllegalArgumentException(
77+
"Configuration property '" + TIERING_KEY_DATE_GROUP + "' must be a valid integer", e);
78+
}
79+
if (rowKeyRegexExtractGroup < 0) {
80+
throw new IllegalArgumentException(
81+
"Configuration property '" + TIERING_KEY_DATE_GROUP + "' must be non-negative");
82+
}
83+
// Validate extract group exists in pattern
84+
int groupCount = rowKeyPattern.matcher("").groupCount();
85+
if (rowKeyRegexExtractGroup > groupCount) {
86+
throw new IllegalArgumentException(
87+
"Extract group " + rowKeyRegexExtractGroup + " exceeds pattern group count " + groupCount);
88+
}
89+
90+
LOG.info("Initialized RowKeyDateTieringValueProvider with regex='{}', dateFormat='{}' ",
91+
regexPatternStr, dateFormat);
92+
}
93+
94+
@Override
95+
public long getTieringValue(ExtendedCell cell) {
96+
if (rowKeyPattern == null || dateFormat == null || rowKeyRegexExtractGroup == null) {
97+
throw new IllegalStateException("RowKeyDateTieringValueProvider not initialized properly");
98+
}
99+
byte[] rowArray = new byte[cell.getRowLength()];
100+
System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowArray, 0, cell.getRowLength());
101+
String rowKeyStr;
102+
try {
103+
rowKeyStr = Bytes.toString(rowArray);
104+
// Validate UTF-8 encoding
105+
if (rowKeyStr.contains("\ufffd")) {
106+
LOG.debug("Failed to extract the date from row key due to invalid UTF-8 encoding");
107+
return Long.MAX_VALUE;
108+
}
109+
} catch (Exception e) {
110+
LOG.debug("Failed to convert row key to string", e);
111+
return Long.MAX_VALUE;
112+
}
113+
114+
Matcher matcher = rowKeyPattern.matcher(rowKeyStr);
115+
if (!matcher.find()) {
116+
LOG.debug("Row key '{}' does not match the regex pattern", rowKeyStr);
117+
return Long.MAX_VALUE;
118+
}
119+
120+
String extractedValue = null;
121+
try {
122+
extractedValue = matcher.group(rowKeyRegexExtractGroup);
123+
if (extractedValue == null || extractedValue.isEmpty()) {
124+
LOG.debug("No value extracted from row key '{}' using regex pattern", rowKeyStr);
125+
return Long.MAX_VALUE;
126+
}
127+
return dateFormat.parse(extractedValue).getTime();
128+
} catch (ParseException e) {
129+
LOG.debug("Error parsing date value '{}' extracted from row key '{}'", extractedValue,
130+
rowKeyStr, e);
131+
} catch (Exception e) {
132+
LOG.debug("Unexpected error while parsing date from row key '{}'", rowKeyStr, e);
133+
}
134+
return Long.MAX_VALUE;
135+
}
136+
137+
public Pattern getRowKeyPattern() {
138+
return rowKeyPattern;
139+
}
140+
141+
public SimpleDateFormat getDateFormat() {
142+
return dateFormat;
143+
}
144+
145+
public Integer getRowKeyRegexExtractGroup() {
146+
return rowKeyRegexExtractGroup;
147+
}
148+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCustomCellTieredCompactor.java

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@
1919

2020
import static org.apache.hadoop.hbase.regionserver.CustomTieringMultiFileWriter.CUSTOM_TIERING_TIME_RANGE;
2121
import static org.apache.hadoop.hbase.regionserver.compactions.CustomCellTieringValueProvider.TIERING_CELL_QUALIFIER;
22+
import static org.apache.hadoop.hbase.regionserver.compactions.CustomTieredCompactor.TIERING_VALUE_PROVIDER;
23+
import static org.apache.hadoop.hbase.regionserver.compactions.RowKeyDateTieringValueProvider.TIERING_KEY_DATE_FORMAT;
24+
import static org.apache.hadoop.hbase.regionserver.compactions.RowKeyDateTieringValueProvider.TIERING_KEY_DATE_PATTERN;
2225
import static org.junit.Assert.assertEquals;
2326
import static org.junit.Assert.assertNotNull;
2427
import static org.junit.Assert.fail;
2528

2629
import java.io.IOException;
30+
import java.text.SimpleDateFormat;
2731
import java.util.ArrayList;
32+
import java.util.Date;
2833
import java.util.List;
2934
import org.apache.hadoop.hbase.HBaseClassTestRule;
3035
import org.apache.hadoop.hbase.HBaseTestingUtil;
@@ -145,4 +150,175 @@ public void testCustomCellTieredCompactor() throws Exception {
145150
}
146151
});
147152
}
153+
154+
@Test
155+
public void testCustomCellTieredCompactorWithRowKeyDateTieringValue() throws Exception {
156+
// Restart mini cluster with RowKeyDateTieringValueProvider
157+
utility.shutdownMiniCluster();
158+
utility.getConfiguration().set(TIERING_VALUE_PROVIDER,
159+
RowKeyDateTieringValueProvider.class.getName());
160+
utility.startMiniCluster();
161+
162+
ColumnFamilyDescriptorBuilder clmBuilder = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY);
163+
clmBuilder.setValue("hbase.hstore.engine.class", CustomTieredStoreEngine.class.getName());
164+
165+
// Table 1: Date at end with format yyyyMMddHHmmssSSS
166+
TableName table1Name = TableName.valueOf("testTable1");
167+
TableDescriptorBuilder tbl1Builder = TableDescriptorBuilder.newBuilder(table1Name);
168+
tbl1Builder.setColumnFamily(clmBuilder.build());
169+
tbl1Builder.setValue(TIERING_KEY_DATE_PATTERN, "(\\d{17})$");
170+
tbl1Builder.setValue(TIERING_KEY_DATE_FORMAT, "yyyyMMddHHmmssSSS");
171+
utility.getAdmin().createTable(tbl1Builder.build());
172+
utility.waitTableAvailable(table1Name);
173+
174+
// Table 2: Date at beginning with format yyyy-MM-dd HH:mm:ss
175+
TableName table2Name = TableName.valueOf("testTable2");
176+
TableDescriptorBuilder tbl2Builder = TableDescriptorBuilder.newBuilder(table2Name);
177+
tbl2Builder.setColumnFamily(clmBuilder.build());
178+
tbl2Builder.setValue(TIERING_KEY_DATE_PATTERN, "^(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})");
179+
tbl2Builder.setValue(TIERING_KEY_DATE_FORMAT, "yyyy-MM-dd HH:mm:ss");
180+
utility.getAdmin().createTable(tbl2Builder.build());
181+
utility.waitTableAvailable(table2Name);
182+
183+
Connection connection = utility.getConnection();
184+
long recordTime = System.currentTimeMillis();
185+
long oldTime = recordTime - (11L * 366L * 24L * 60L * 60L * 1000L);
186+
187+
SimpleDateFormat sdf1 = new SimpleDateFormat("yyyyMMddHHmmssSSS");
188+
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
189+
190+
// Write to Table 1 with date at end
191+
Table table1 = connection.getTable(table1Name);
192+
for (int i = 0; i < 6; i++) {
193+
List<Put> puts = new ArrayList<>(2);
194+
195+
// Old data
196+
String oldDate = sdf1.format(new Date(oldTime));
197+
Put put = new Put(Bytes.toBytes("row_" + i + "_" + oldDate));
198+
put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
199+
puts.add(put);
200+
201+
// Recent data
202+
String recentDate = sdf1.format(new Date(recordTime));
203+
put = new Put(Bytes.toBytes("row_" + (i + 1000) + "_" + recentDate));
204+
put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
205+
puts.add(put);
206+
207+
table1.put(puts);
208+
utility.flush(table1Name);
209+
}
210+
table1.close();
211+
212+
// Write to Table 2 with date at beginning
213+
Table table2 = connection.getTable(table2Name);
214+
for (int i = 0; i < 6; i++) {
215+
List<Put> puts = new ArrayList<>(2);
216+
217+
// Old data
218+
String oldDate = sdf2.format(new Date(oldTime));
219+
Put put = new Put(Bytes.toBytes(oldDate + "_row_" + i));
220+
put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + i));
221+
puts.add(put);
222+
223+
// Recent data
224+
String recentDate = sdf2.format(new Date(recordTime));
225+
put = new Put(Bytes.toBytes(recentDate + "_row_" + (i + 1000)));
226+
put.addColumn(FAMILY, Bytes.toBytes("val"), Bytes.toBytes("v" + (i + 1000)));
227+
puts.add(put);
228+
229+
table2.put(puts);
230+
utility.flush(table2Name);
231+
}
232+
table2.close();
233+
234+
// First compaction for Table 1
235+
long compactionTime1 = System.currentTimeMillis();
236+
utility.getAdmin().majorCompact(table1Name);
237+
Waiter.waitFor(utility.getConfiguration(), 5000,
238+
() -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table1Name)
239+
> compactionTime1);
240+
241+
assertEquals(1, utility.getNumHFiles(table1Name, FAMILY));
242+
243+
utility.getMiniHBaseCluster().getRegions(table1Name).get(0).getStore(FAMILY).getStorefiles()
244+
.forEach(file -> {
245+
byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
246+
assertNotNull(rangeBytes);
247+
try {
248+
TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
249+
assertEquals(oldTime, timeRangeTracker.getMin());
250+
assertEquals(recordTime, timeRangeTracker.getMax());
251+
} catch (IOException e) {
252+
fail(e.getMessage());
253+
}
254+
});
255+
256+
// Second compaction for Table 1
257+
long secondCompactionTime1 = System.currentTimeMillis();
258+
utility.getAdmin().majorCompact(table1Name);
259+
Waiter.waitFor(utility.getConfiguration(), 5000,
260+
() -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table1Name)
261+
> secondCompactionTime1);
262+
263+
assertEquals(2, utility.getNumHFiles(table1Name, FAMILY));
264+
265+
utility.getMiniHBaseCluster().getRegions(table1Name).get(0).getStore(FAMILY).getStorefiles()
266+
.forEach(file -> {
267+
byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
268+
assertNotNull(rangeBytes);
269+
try {
270+
TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
271+
assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
272+
} catch (IOException e) {
273+
fail(e.getMessage());
274+
}
275+
});
276+
277+
// First compaction for Table 2
278+
long compactionTime2 = System.currentTimeMillis();
279+
utility.getAdmin().majorCompact(table2Name);
280+
Waiter.waitFor(utility.getConfiguration(), 5000,
281+
() -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table2Name)
282+
> compactionTime2);
283+
284+
assertEquals(1, utility.getNumHFiles(table2Name, FAMILY));
285+
286+
utility.getMiniHBaseCluster().getRegions(table2Name).get(0).getStore(FAMILY).getStorefiles()
287+
.forEach(file -> {
288+
byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
289+
assertNotNull(rangeBytes);
290+
try {
291+
TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
292+
// Table 2 uses yyyy-MM-dd HH:mm:ss format, so we need to account for second precision
293+
// The parsed time will be truncated to second precision (no milliseconds)
294+
long expectedOldTime = (oldTime / 1000) * 1000;
295+
long expectedRecentTime = (recordTime / 1000) * 1000;
296+
assertEquals(expectedOldTime, timeRangeTracker.getMin());
297+
assertEquals(expectedRecentTime, timeRangeTracker.getMax());
298+
} catch (IOException e) {
299+
fail(e.getMessage());
300+
}
301+
});
302+
303+
// Second compaction for Table 2
304+
long secondCompactionTime2 = System.currentTimeMillis();
305+
utility.getAdmin().majorCompact(table2Name);
306+
Waiter.waitFor(utility.getConfiguration(), 5000,
307+
() -> utility.getMiniHBaseCluster().getMaster().getLastMajorCompactionTimestamp(table2Name)
308+
> secondCompactionTime2);
309+
310+
assertEquals(2, utility.getNumHFiles(table2Name, FAMILY));
311+
312+
utility.getMiniHBaseCluster().getRegions(table2Name).get(0).getStore(FAMILY).getStorefiles()
313+
.forEach(file -> {
314+
byte[] rangeBytes = file.getMetadataValue(CUSTOM_TIERING_TIME_RANGE);
315+
assertNotNull(rangeBytes);
316+
try {
317+
TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(rangeBytes);
318+
assertEquals(timeRangeTracker.getMin(), timeRangeTracker.getMax());
319+
} catch (IOException e) {
320+
fail(e.getMessage());
321+
}
322+
});
323+
}
148324
}

0 commit comments

Comments
 (0)