shell自动换行:关放大阅读展开代码[root@hadoop162 20221202_1_2_1]# pwd /var/lib/clickhouse/data/gmall2022/dws_trade_cart_add_uu_window/20221202_1_2_1 [root@hadoop162 20221202_1_2_1]# ll total 36 -rw-r-----. 1 clickhouse clickhouse 252 Dec 13 08:31 checksums.txt -rw-r-----. 1 clickhouse clickhouse 103 Dec 13 08:31 columns.txt -rw-r-----. 1 clickhouse clickhouse 1 Dec 13 08:31 count.txt -rw-r-----. 1 clickhouse clickhouse 151 Dec 13 08:31 data.bin -rw-r-----. 1 clickhouse clickhouse 144 Dec 13 08:31 data.mrk3 -rw-r-----. 1 clickhouse clickhouse 10 Dec 13 08:31 default_compression_codec.txt -rw-r-----. 1 clickhouse clickhouse 8 Dec 13 08:31 minmax_stt.idx -rw-r-----. 1 clickhouse clickhouse 4 Dec 13 08:31 partition.dat -rw-r-----. 1 clickhouse clickhouse 16 Dec 13 08:31 primary.idx
主要的文件:
java自动换行:关放大阅读展开代码// 统计5s内的词频 Table result = tEnv.sqlQuery("select " + " date_format(window_start, 'yyyy-MM-dd HH:mm:ss') stt, " + " date_format(window_end, 'yyyy-MM-dd HH:mm:ss') edt, " + " kw keyword, " + " count(*) keyword_count, " + " unix_timestamp()*1000 as ts " + // et as ... "from table(TUMBLE(TABLE ana_table, DESCRIPTOR(et), interval '5' second )) " + "group by kw,window_start,window_end"); // 将Flink中的动态表转化为Stream,才能写入ClickHouse SingleOutputStreamOperator<KeywordBean> map = tEnv.toRetractStream(result, KeywordBean.class) // 过滤到回撤的数据,即出现聚合结果更新,RetractStream会有更新前和更新后概念;这里只需要更新后的数据 // Tuple<Boolean,KeywordBean>,Boolean为false,标识更新前的数据 .filter(w -> w.f0) //过滤掉Boolean标识 .map(w -> w.f1);
order by字段和指定的版本字段实现,ReplacingMergeTree() 填入的参数为版本字段,重复数据保留版本字段值最大的。如果不填版本字段,默认保留最后一条sql自动换行:关放大阅读展开代码create table if not exists dws_traffic_keyword_page_view_window ( stt DateTime COMMENT '窗口起始时间', edt DateTime COMMENT '窗口结束时间', keyword String COMMENT '关键词', keyword_count UInt64 COMMENT '关键词出现频次', ts UInt64 COMMENT '时间戳' ) engine = ReplacingMergeTree(ts) partition by toYYYYMMDD(stt) order by (stt, edt, keyword);
ClickHouse,ElasticSearch,Drios
ReplicatedReplacingMergeTree,ReplacingMergeTree,SummingMergeTreesql自动换行:关放大阅读展开代码-- 包含稀疏索引(其中的最大最小值索引)的建表语句 CREATE TABLE default.mt ( `a` Int32, `b` Int32, `c` Int32, INDEX `idx_c` (c) TYPE minmax GRANULARITY 1 -- 为c字段建立maxmin索引,加速c的查询 ) ENGINE = MergeTree -- 使用的表引擎 PARTITION BY a --分区字段,建表可选 ORDER BY b -- 排序字段 SETTINGS index_granularity=3 -- 主键索引(无则为order by字段)的稀疏索引粒度,默认为8192
plaintext自动换行:关放大阅读展开代码默认为表的主键,没有主键则为order by字段建立稀疏索引,即默认每8192条记录,取一个主键,此处对比稠密索引
sql自动换行:关放大阅读展开代码CREATE TABLE skip_table ( my_key UInt64, my_value UInt64 ) ENGINE MergeTree primary key my_key SETTINGS index_granularity=8192; INSERT INTO skip_table SELECT number, intDiv(number,4096) FROM numbers(100000000);
sql自动换行:关放大阅读展开代码SELECT * FROM skip_table WHERE my_value IN (125, 700) ┌─my_key─┬─my_value─┐ │ 512000 │ 125 │ │ 512001 │ 125 │ │ ... | ... | └────────┴──────────┘ 8192 rows in set. Elapsed: 0.079 sec. Processed 100.00 million rows, 800.10 MB (1.26 billion rows/s., 10.10 GB/s. -- 可以看出进行全表扫描
materialized view,clickhouse会自动维护新表和原表的数据一致projection ,还是实验性的功能,相当于在materialized view基础上,根据SQL语句自动选择索引使用顺序
此处考虑建立跳数索引的方式sql自动换行:关放大阅读展开代码-- 为原表添加名为vix 的跳数索引,set(100)表示这个跳数索引最多存储100个值,索引粒度为2个GRANULARITY ALTER TABLE skip_table ADD INDEX vix my_value TYPE set(100) GRANULARITY 2; -- 使得新添加的跳数索引作用所有新老数据 ALTER TABLE skip_table MATERIALIZE INDEX vix; -- 再次查询 SELECT * FROM skip_table WHERE my_value IN (125, 700) ┌─my_key─┬─my_value─┐ │ 512000 │ 125 │ │ 512001 │ 125 │ │ ... | ... | └────────┴──────────┘ 8192 rows in set. Elapsed: 0.051 sec. Processed 32.77 thousand rows, 360.45 KB (643.75 thousand rows/s., 7.08 MB/s.) -- 明显是范围查询了
sql自动换行:关放大阅读展开代码drop table if exists dws_trade_order_window; create table if not exists dws_trade_order_window ( stt DateTime COMMENT '窗口起始时间', edt DateTime COMMENT '窗口结束时间', order_unique_user_count UInt64 COMMENT '下单独立用户数', order_new_user_count UInt64 COMMENT '下单新用户数', ts UInt64 COMMENT '时间戳' )-- # ReplacingMergeTree 和 ReplacingMergeTree 组合 engine = ReplicatedReplacingMergeTree(ts) partition by toYYYYMMDD(stt) order by (stt, edt);
其他的引擎:
xml自动换行:关放大阅读展开代码-- 默认使用节点90%的内存资源,可以适当调整到70% <max_server_memory_usage_to_ram_ratio>0.9</max_server_memory_usage_to_ram_ratio>
xml自动换行:关放大阅读展开代码// 配置文件修改并发处理能力,每秒处理的条数 ,比如提高到300 <max_concurrent_queries>100</max_concurrent_queries>
sql自动换行:关放大阅读展开代码optimize dws_user_user_login_window; select * from dws_user_user_login_window;
java自动换行:关放大阅读展开代码// 自定义clickhouse sinker 写数据到clickhouse JdbcSink.sink(sql.toString(), new JdbcStatementBuilder<T>() { @Override public void accept(PreparedStatement ps, T t) throws SQLException { // sql preparement设置 }, new JdbcExecutionOptions.Builder() // 1M .withBatchSize(1024 * 1024) // 5 seconds .withBatchIntervalMs(5000) // 重试次数 .withMaxRetries(3) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName(driver) .withUrl(url) .withUsername(user) .withPassword(password) .build() );
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!