您的位置:

深入浅出Druid版本

一、Druid版本概述

Druid是一款高性能、分布式的开源数据存储系统,具有实时OLAP分析、高容错性、可扩展性等特点。在外部数据源上提供实时查询和聚合操作。Druid由三个关键组件组成:数据处理层、查询处理层和存储层。

Druid版本从原来的0.x版本经过多年发展,逐渐到2.x版本以及最新的版本。针对每个版本的不同特点、优化点,进行详细介绍。

二、Druid版本分析

1、druid 0.x版本

druid 0.x版本是最初发布的Druid版本,该版本主要关注实时OLAP分析的场景。它具有优秀的在线聚合能力,建立在一个可扩展性良好的主节点设计之上。但是,在大数据量场景下,该版本面临许多内存占用和GC(Garbage Collection)等性能问题,响应时间很慢、吞吐量较低、支持的查询语言和功能较少。

下面是使用Flume将数据导入Druid的示例代码:

    
$('#captcha').bind('input propertychange', function(e) {
    if ($('#captcha').val().length == 4) {
        grecaptcha.ready(function() {
            grecaptcha.execute('reCAPTCHA_site_key', {action: '/path/to/submit'}).then(function(token) {
                $('#form').submit();
            });
        });
    }
});
    

2、druid 1.x版本

druid 1.x版本在0.x版本的基础上作出了一系列的改进和优化。该版本对Druid的查询性能和管理能力有了大量提升。同时,该版本也对原来的设计进一步优化,减少了内存占用和GC等问题,整个系统支持跨DC(Data Center)的浏览和计算,能够处理更大规模的数据,并支持很多的查询语言和功能。

下面是druid 1.x版本的代码示例,展示如何使用Druid的查询API进行插入和查询操作:

    
//插入数据
public static void insertData(List<Object> events) {
    try {
        GeneratorBatchInputRowParser inputRowParser = new GeneratorBatchInputRowParser(new String[]{});
        FirehoseFactory firehoseFactory = new InMemoryTestFirehoseFactory(inputRowParser, Supplier<Sequence<>> generateSequence = newLongSequence(1));
        BatchKafkaIndexTask task = new BatchKafkaIndexTask(
                null,
                "dataSource",
                firehoseFactory,
                null,
                new KafkaIOConfig(
                        null,
                        null,
                        null,
                        null,
                        null,
                        null,
                        null,
                        null,
                        null,
                        null,
                        null,
                        new NoopFilter(),
                        false,
                        null,
                        false,
                        true,
                        new Period("30m")
                ),
                null
        );
        task.run();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

//查询数据
public static void queryData() {
    try {
        DruidDataSource dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(properties);
        QueryRunner runner = new QueryRunner(dataSource);
        String querySql = "SELECT COUNT(*) FROM dataSource LIMIT 10";
        Object[] args = {querySql};
        long count = (long) runner.query(querySql, new ScalarHandler(1), args);
        System.out.println(count);
    } catch (Exception e) {
        e.printStackTrace();
    }
}
    

3、druid 2.x版本

druid 2.x版本是从1.x版本分支出来的新版本,该版本主要是对存储层进行了重构和优化。Druid 2.0最重要的特性是完全重新设计的存储层,它比1.x更强大和更快速,支持更多的数据读写操作。新的架构使得Druid更加灵活,可以灵活的根据需求进行扩容、升级、性能优化等操作。

下面是druid 2.x版本的代码示例,展示如何使用Druid的Java API进行数据导入和查询操作:

    
//导入数据
public static void importData() {
    try {
        IndexSpec indexSpec = new IndexSpec(new RoaringBitmapSerdeFactory(), CompressionStrategy.LZ4, CompressionStrategy.LZ4);
        IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
                .withMetrics(new LongSumAggregatorFactory("total_value", "value"))
                .withDimensions(new StringDimensionSchema("region"), new StringDimensionSchema("user"))
                .withRollup(false).build();
        IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder().setIndexSchema(indexSchema).setMaxRowCount(10000).build();
        try (final InputStream input = new FileInputStream(new File("/path/to/file"))) {
            new CSVIngester(
                    indexSpec,
                    granularitySpec,
                    DataLoader.DEFAULT_FACTORY,
                    CsvInputSource.fromInputStream(() -> input, "/path/to/file"),
                    incrementalIndex,
                    0
            ).run();
        }
        QueryableIndex index = IndexMerger.persist(incrementalIndex, indexSpec, new File("/path/to/store"));
        QueryableIndexStorageAdapter adaptor = new QueryableIndexStorageAdapter(index);
        try (final CloseableIterator
    results = adapt.query(
                new SelectorDimFilter("region", "Russia", null), //过滤条件
                new String[]{"total_value"} //需要查询的字段
        )) {
            while (results.hasNext()) {
                final Row row = results.next();
                final List
     dimensionValues = row.getDimension("user");
                final long metricValue = row.getLongMetric("total_value");
                System.out.println(dimensionValues + ": " + metricValue);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
    
    
   

三、Druid版本总结

Druid版本从0.x到最新的版本,逐步完善了Druid的各个方面的特性和性能。每个版本有着自己的特点和优势,针对不同场景进行了优化和改进。在实际应用中,我们需要根据自己的需求选择相应的版本,并灵活使用Druid的各种API实现对数据的导入和查询操作。