当前位置: 首页 > news >正文

建设银行不良资产处置网站新公司做网站多少钱

建设银行不良资产处置网站,新公司做网站多少钱,湛江网站建站建设,有什么做任务赚钱的网站一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map(映射) 将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素…

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_map {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);s_num.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer values) throws Exception {return values*values;}}).print();}
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;public class Transform_filter {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();elements.filter(value -> value % 2 != 0).print();}
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;import java.util.concurrent.ExecutionException;public class flatMap {public static void main(String[] args) throws Exception {ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {@Overridepublic void flatMap(Integer integer, Collector<Integer> collector) throws Exception {collector.collect(integer*integer);collector.collect(integer*integer*integer);}}).print();}
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1).print();env.execute("execute");}
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyBy_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L)).keyBy(0) // 以数组的第一个元素作为key.sum(1).print();env.execute("execute");}
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class keyByReduce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L)).keyBy(0).reduce(new ReduceFunction<Tuple2<Long, Long>>() {@Overridepublic Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {return new Tuple2<>(values1.f0,values1.f1+values2.f1);}}).print();env.execute();}
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class Process_s {public static void main(String[] args) throws Exception {// 获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 3 == 0) {//测流数据ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);}if (value % 3 == 1) {//测流数据ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);}//主流 ,数据out.collect(value);}});DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));output1.print();env.execute();}
}

四、合流算子

4.1、connect(连接)

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;public class connect_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);DataStreamSource<String> data2 = env.fromElements("a", "b", "c");ConnectedStreams<Integer, String> data3 = data1.connect(data2);data3.getFirstInput().print("data1");data3.getSecondInput().print("data2");env.execute();}
}

4.2、union(合并)

package com.lyh.flink05;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class union_s {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);DataStreamSource<Integer> data2 = env.fromElements(555,666);DataStreamSource<Integer> data3 = env.fromElements(999);DataStream<Integer> data = data1.union(data2).union(data3);data.print();env.execute();}
}
http://www.mnyf.cn/news/35283.html

相关文章:

  • 没有网站 淘宝客培训网站推广
  • 齐诺网站建设东莞网站建设做网站网站seo推广哪家值得信赖
  • 万江做网站连云港百度推广总代理
  • 国外有哪些做deal的网站如何制作简易网站
  • 怎样注册网站域名如何让新网站被收录
  • iis发布网站慢全网营销的公司
  • 上海做网站费用百度一下你就知道搜索
  • 档案网站建设的原则创建免费网站
  • wordpress在线客服插件seo网站排名后退
  • wordpress识图搜索代码全国seo搜索排名优化公司
  • 西安网站制作推广网络推广和网络营销的区别
  • phpcms网站打开空白seo结算系统
  • 电商网站获取流量的方法seo标题优化关键词怎么选
  • 物流网站首页图片饥饿营销的十大案例
  • 网站怎样制作 优帮云it培训班真的有用吗
  • 建设网站要用到什么语言三只松鼠网络营销策略
  • 为wordpress添加虚拟用户权限网站免费优化软件
  • WordPress和ftp区别优化大师电脑版
  • 手机网站在线客服长沙百度网站排名优化
  • 为什么资讯网站荣誉被收录全国最新疫情实时状况地图
  • 汕头市濠江区政府门户网站关键词优化软件有哪些
  • 东莞建站模板代理百度引擎搜索
  • 刚开始的网站开发公司网销怎么做
  • 中国免费建站网建立网站费用大概需要多少钱
  • 泉州网站建设 推广电商运营
  • wordpress侧边栏导航seo运营是什么意思
  • 义乌外贸公司网站网站建设维护
  • 和萝莉做的电影网站seo搜索引擎优化课程
  • 多用户商城app源码站长seo软件
  • 如何java做网站成都高端企业网站建设