一、Druid版本概述
Druid是一款高性能、分布式的开源数据存储系统,具有实时OLAP分析、高容错性、可扩展性等特点。在外部数据源上提供实时查询和聚合操作。Druid由三个关键组件组成:数据处理层、查询处理层和存储层。
Druid版本从原来的0.x版本经过多年发展,逐渐到2.x版本以及最新的版本。针对每个版本的不同特点、优化点,进行详细介绍。
二、Druid版本分析
1、druid 0.x版本
druid 0.x版本是最初发布的Druid版本,该版本主要关注实时OLAP分析的场景。它具有优秀的在线聚合能力,建立在一个可扩展性良好的主节点设计之上。但是,在大数据量场景下,该版本面临许多内存占用和GC(Garbage Collection)等性能问题,响应时间很慢、吞吐量较低、支持的查询语言和功能较少。
下面是使用Flume将数据导入Druid的示例代码:
$('#captcha').bind('input propertychange', function(e) {
if ($('#captcha').val().length == 4) {
grecaptcha.ready(function() {
grecaptcha.execute('reCAPTCHA_site_key', {action: '/path/to/submit'}).then(function(token) {
$('#form').submit();
});
});
}
});
2、druid 1.x版本
druid 1.x版本在0.x版本的基础上作出了一系列的改进和优化。该版本对Druid的查询性能和管理能力有了大量提升。同时,该版本也对原来的设计进一步优化,减少了内存占用和GC等问题,整个系统支持跨DC(Data Center)的浏览和计算,能够处理更大规模的数据,并支持很多的查询语言和功能。
下面是druid 1.x版本的代码示例,展示如何使用Druid的查询API进行插入和查询操作:
//插入数据
public static void insertData(List<Object> events) {
try {
GeneratorBatchInputRowParser inputRowParser = new GeneratorBatchInputRowParser(new String[]{});
FirehoseFactory firehoseFactory = new InMemoryTestFirehoseFactory(inputRowParser, Supplier<Sequence<>> generateSequence = newLongSequence(1));
BatchKafkaIndexTask task = new BatchKafkaIndexTask(
null,
"dataSource",
firehoseFactory,
null,
new KafkaIOConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
new NoopFilter(),
false,
null,
false,
true,
new Period("30m")
),
null
);
task.run();
} catch (Exception e) {
e.printStackTrace();
}
}
//查询数据
public static void queryData() {
try {
DruidDataSource dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(properties);
QueryRunner runner = new QueryRunner(dataSource);
String querySql = "SELECT COUNT(*) FROM dataSource LIMIT 10";
Object[] args = {querySql};
long count = (long) runner.query(querySql, new ScalarHandler(1), args);
System.out.println(count);
} catch (Exception e) {
e.printStackTrace();
}
}
3、druid 2.x版本
druid 2.x版本是从1.x版本分支出来的新版本,该版本主要是对存储层进行了重构和优化。Druid 2.0最重要的特性是完全重新设计的存储层,它比1.x更强大和更快速,支持更多的数据读写操作。新的架构使得Druid更加灵活,可以灵活的根据需求进行扩容、升级、性能优化等操作。
下面是druid 2.x版本的代码示例,展示如何使用Druid的Java API进行数据导入和查询操作:
//导入数据
public static void importData() {
try {
IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(), CompressionStrategy.LZ4, CompressionStrategy.LZ4);
IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMetrics(new LongSumAggregatorFactory("total_value", "value"))
.withDimensions(new StringDimensionSchema("region"), new StringDimensionSchema("user"))
.withRollup(false).build();
IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder().setIndexSchema(indexSchema).setMaxRowCount(10000).build();
try (final InputStream input = new FileInputStream(new File("/path/to/file"))) {
new CSVIngester(
indexSpec,
granularitySpec,
DataLoader.DEFAULT_FACTORY,
CsvInputSource.fromInputStream(() -> input, "/path/to/file"),
incrementalIndex,
0
).run();
}
QueryableIndex index = IndexMerger.persist(incrementalIndex, indexSpec, new File("/path/to/store"));
QueryableIndexStorageAdapter adaptor = new QueryableIndexStorageAdapter(index);
try (final CloseableIterator
results = adapt.query(
new SelectorDimFilter("region", "Russia", null), //过滤条件
new String[]{"total_value"} //需要查询的字段
)) {
while (results.hasNext()) {
final Row row = results.next();
final List
dimensionValues = row.getDimension("user");
final long metricValue = row.getLongMetric("total_value");
System.out.println(dimensionValues + ": " + metricValue);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
三、Druid版本总结
Druid版本从0.x到最新的版本,逐步完善了Druid的各个方面的特性和性能。每个版本有着自己的特点和优势,针对不同场景进行了优化和改进。在实际应用中,我们需要根据自己的需求选择相应的版本,并灵活使用Druid的各种API实现对数据的导入和查询操作。