公共静态类CountAgg实现AggregateFunction<;用户行为,Tuple2<;长,布卢姆过滤器<;字符串>;>;,Long>;{
//创建累加器
@凌驾
公共Tuple2<;长,布卢姆过滤器<;字符串>;>;create累加器(){
返回Tuple2。of(0L,BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000,0.01));
}
@凌驾
公共Tuple2<;长,布卢姆过滤器<;字符串>;>;添加(UserBehavior元素、Tuple2长度、BloomFilter字符串累加器){
如果(!acculator.f1.mightContain(ele.userId))
{
累加器。f1。put(ele.userId);
累加器。f0+=1L;
}
回流蓄能器;
}
@凌驾
publicLonggetResult(Tuple2<;Long,BloomFilter<;String>;>;累加器){
返回蓄能器。f0;
}
@凌驾
公共Tuple2<;长,布卢姆过滤器<;字符串>;>;合并(Tuple2<;Long,BloomFilter<;String>;>;>;longBloomFilterTuple2,Tuple2<;Long,BloomFilter<;String>;>;>;acc1){
返回null;
}
}
完整代码
day08包装;
导入组织。阿帕奇。平民压紧乌提尔斯。字符集;
导入组织。阿帕奇。弗林克。应用程序编程接口。常见的活动时间。SerializableTimestAssigner;
导入组织。阿帕奇。弗林克。应用程序编程接口。常见的活动时间。水印策略;
导入组织。阿帕奇。弗林克。应用程序编程接口。常见的功能。聚合函数;
导入组织。阿帕奇。弗林克。应用程序编程接口。常见的功能。映射函数;
导入组织。阿帕奇。弗林克。应用程序编程接口。JAVA元组。Tuple2;
导入组织。阿帕奇。弗林克。方解石遮住的。通用域名格式。谷歌。常见的搞砸布卢姆过滤器;
导入组织。阿帕奇。弗林克。方解石遮住的。通用域名格式。谷歌。常见的搞砸漏斗;
导入组织。阿帕奇。弗林克。流动。应用程序编程接口。数据流。数据流源;
导入组织。阿帕奇。弗林克。流动。应用程序编程接口。环境流光执行环境;
导入组织。阿帕奇。弗林克。流动。应用程序编程接口。功能。开窗。处理窗口功能;
导入组织。阿帕奇。弗林克。流动。应用程序编程接口。开窗。转让人。翻滚的小窗户;
导入组织。阿帕奇。弗林克。流动。应用程序编程接口。开窗。时间时间
导入组织。阿帕奇。弗林克。流动。应用程序编程接口。开窗。窗户。时间窗;
导入组织。阿帕奇。弗林克。util。收藏家;
导入java。sql。时间戳;
导入java。util。哈希集;
/**
*@program:bigData_uu;learn
*@说明:独立访客人数
*@作者:有趣先生
*@create:2021-09-2615:51
**/
公共类UserCountOne{
公共静态类用户行为{
公共字符串用户ID;
公共字符串itemId;
公共字符串类别;公共字符串行为;
公共长时间戳;
公共用户行为{
}
公共用户行为(字符串用户ID、字符串项目ID、字符串类别ID、字符串行为、长时间戳){
这userId=userId;
这itemId=itemId;
这categoryId=categoryId;
这行为=行为;
这时间戳=时间戳;
}
@凌驾
公共字符串toString(){
返回";用户行为{"+
"userId=';"+userId+';''+
",itemId=';"+itemId+';''+
",类别ID=';"+类别ID+';''+
",行为=';"+行为+';''+
",时间戳=";+新时间戳(时间戳)+
'}';
}
}
公共静态类CountAgg实现AggregateFunction<;用户行为,Tuple2<;长,布卢姆过滤器<;字符串>;>;,Long>;{
//创建累加器
@凌驾
公共Tuple2<;长,布卢姆过滤器<;字符串>;>;create累加器(){
返回Tuple2。of(0L,BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000,0.01));
}
@凌驾
公共Tuple2<;长,布卢姆过滤器<;字符串>;>;添加(UserBehavior元素、Tuple2长度、BloomFilter字符串累加器){
如果(!acculator.f1.mightContain(ele.userId))
{
累加器。f1。put(ele.userId);
累加器。f0+=1L;
}
回流蓄能器;
}
@凌驾
publicLonggetResult(Tuple2<;Long,BloomFilter<;String>;>;累加器){
返回蓄能器。f0;
}
@凌驾
公共Tuple2<;长,布卢姆过滤器<;字符串>;>;合并(Tuple2<;Long,BloomFilter<;String>;>;>;longBloomFilterTuple2,Tuple2<;Long,BloomFilter<;String>;>;>;acc1){
返回null;
}
}
公共静态类WindowResult扩展了ProcessWindowFunction<;Long、String、Boolean、TimeWindow>;{
@凌驾
publicvoid进程(Booleans、Contextctx、Iterable<;Long>;ele、Collector<;String>;out)抛出异常{
字符串开始=新的时间戳(ctx.window()。getStart())。toString();
字符串结束=新的时间戳(ctx.window()。getEnd())。toString();长计数=ele。迭代器()。next();
出来收集(";窗口";+开始+";~";+结束+";);
}
}
公共静态voidmain(字符串[]args){
StreamExecutionEnvironmentenv=StreamExecutionEnvironment。getExecutionEnvironment();
环境。设置平行度(1);
DataStreamSource<;字符串>;source=env。readTextFile(";G://bigData_learn/Flink_learn/src/main/resources/UserBehavior.csv";);
来源映射(新的映射函数<;字符串,用户行为>;(){
@凌驾
公共用户行为映射(字符串值)引发异常{
字符串[]arr=value。分裂(";,";);
返回新的用户行为(arr[0]、arr[1]、arr[2]、arr[3]、Long.parseLong(arr[4])*1000L);
}
}).过滤器(v->;v.behavior.equals(";pv";)
.安全性和水印(
水印策略书信电报;用户行为>;forMonotonousTimestamps()
.withTimestampAssigner(新的SerializableTimstampAssigner<;UserBehavior>;(){
@凌驾
公共长提取时间戳(UserBehaviorele,长记录时间戳){
返回ele。时间戳;
}
})
).keyBy(v->;真)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(newCountAgg(),newWindowResult())
.print();
Stringname=UserBehavior。班getName();
试试看{
环境。执行(姓名);
}捕获(例外e)
{
e、printStackTrace();
}
}
}
结果显示
window2017-11-2617:59:35.0~2017-11-2617:59:40.0的紫外线统计值为:68
window2017-11-2617:59:40.0~2017-11-2617:59:45.0的紫外线统计值为:55
window2017-11-2617:59:45.0~2017-11-2617:59:50.0的紫外线统计值为:64
window2017-11-2617:59:50.0~2017-11-2617:59:55.0的紫外线统计值为:54
window2017-11-2617:59:55.0~2017-11-2618:00:00.0的紫外线统计值为:67
window2017-11-2618:00:00.0~2017-11-2618:00:05.0的紫外线统计为:13
最新评论