博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark快速入门API① Transformation转换算子
阅读量:3952 次
发布时间:2019-05-24

本文共 2360 字,大约阅读时间需要 7 分钟。

Spark支持两种RDD操作:

transformation和action。transformation操作会针对已有的RDD创建一个新的RDD;

而action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序。

Transformation API 的快速入门

//1.需求使用map函数将 将list中的每一个元素*2 返回新的RDD    // 定义一个RDD 数据为1-10的列表    val list = sc.parallelize(1.to(10))    val newList: RDD[Int] = list.map(_ * 2)//2.需求使用filter将大于5的数据过滤掉得到一个新的RDD    // 定义一个RDD 数据为1-10的列表    val list = sc.parallelize(1.to(10))    val newList: RDD[Int] = list.filter(x=>x>5)// 3.需求定义一个1-10列表指定一个2分区 并将分区0的数据每个一个数据*2 1分区的数据*3 返回新的RDD     list.mapPartitionsWithIndex((x, res) => {if (x == 0) {res.map(_ * 2)} else {res.map(_ * 3)}}).foreach(println)//4.需求定义两个列表list1 list2 将两个的数据求一个并集不去重 返回一个新的RDD    val list1 = sc.parallelize(1.to(10), 2)    val list2 = sc.parallelize(3.to(10), 2)    val rel = list1.union(list2).collect()    println(rel.addString(new StringBuilder,","))//5.需求定义两个列list1 list2 求出列表中的交集并返回新的RDD    // 定义一个RDD 数据为1-10的列表    val list1 = sc.parallelize(1.to(10), 2)    val list2 = sc.parallelize(3.to(10), 2)    val unit = list1.intersection(list2)    unit.foreach(println)// 6.需求定义一个列表去掉重复的数据 返回新的RDD    // 定义一个RDD 数据为1-10的列表    val list1 = sc.parallelize(List(1, 2, 4, 3, 3, 2, 2, 2, 3, 3, 5));    // 使用 distinct 去重    val data = list1.distinct().collect()    data.foreach(println)// 7.需求定义一个KV的类型以K进行分组得到一个新的RDD    // 定义一个RDD    val list1 = sc.parallelize(List(("tom", 1), ("zhangsan", 12), ("kill", 1), ("tom", 2)));    val unit = list1.groupByKey()    unit.foreach(println)//8.需求定义一个列表要求将相同的key的求出总和    // 定义一个RDD    val list1 = sc.parallelize(List(("tom", 1), ("zhangsan", 12), ("kill", 1), ("tom", 2)));    val datas: RDD[(String, Int)] = list1.reduceByKey(_ + _)    datas.foreach(println)// 9.需求定义一个列表为kv类型 需要根据key进行排序    // 定义一个RDD    val list1 = sc.parallelize(List(("tom", 1), ("zhangsan", 12), ("kill", 1), ("tom", 2)));    val unit = list1.sortByKey()    unit.foreach(println)//10.需求定义一个列表 将list1中数据按照升序进行排序    var list1 = sc.parallelize(List(1, 2, 3, 4, 5, 0, 9, 8, 7))    val result = list1.sortBy(x => x, true).collect()    result.foreach(println)// 11.需求将list1 与list2中的数据进行join   // 定义上下文对象    val sc = new SparkContext(conf)    val list1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))    val list2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))    val data = list1.join(list2).collect()    data.foreach(println)

转载地址:http://wokzi.baihongyu.com/

你可能感兴趣的文章
使用CppSQLite3访问SQLite数据库
查看>>
第一个boost程序---timer的使用
查看>>
使用boost asio库实现字节数可控的CS通信
查看>>
linux下串口编程
查看>>
boot asio 非阻塞同步编程---非阻塞的accept和receive。
查看>>
利用ADOX、ADO操纵MDB文件(ACCESS)
查看>>
使用ADO操作MDB,关注数据类型
查看>>
使用windows自带Zip命令压缩文件
查看>>
windows获得文件大小
查看>>
Host 'ETCV3' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts'
查看>>
OCILIB在VS2008中的使用
查看>>
OCILIB VC2008 效率测试
查看>>
PL/SQL设置NUMBER显示为字符串
查看>>
linux ftp 脚本 -- 使用程序执行脚本
查看>>
MFC CListBox的使用
查看>>
VS2008向MFC 对话框 添加托盘图标(显示和消失)
查看>>
redhat中vsftp开机自启动
查看>>
MySQL存储过程,生成大量数据
查看>>
查询字段值出现多次的字段值
查看>>
SQL Server表存在则进行查重 SQL语句
查看>>