当前位置: 首页 > news >正文

极速云建站淘金企业网站建设服务

极速云建站,淘金企业网站建设服务,施工企业三大体系认证,长沙亚町设计使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。 1. reduce 函数源码&…

使用转换算子是产生一个新的rdd,此时在driver端会生成一个逻辑上的执行计划,但任务还没有执行。但所谓的行动算子,其实就是触发作业执行的方法(runJob)。底层代码调用的是环境对象的runJob方法。

1. reduce

函数源码:

def reduce(f: (T, T) => T): T = withScope {val cleanF = sc.clean(f)val reducePartition: Iterator[T] => Option[T] = iter => {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] = Noneval mergeResult = (_: Int, taskResult: Option[T]) => {if (taskResult.isDefined) {jobResult = jobResult match {case Some(value) => Some(f(value, taskResult.get))case None => taskResult}}}sc.runJob(this, reducePartition, mergeResult)// Get the final result out of our Option, or throw an exception if the RDD was emptyjobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))}

函数说明:

简而言之,就是先聚合分区内的数据,再聚合分区间的数据。

object Spark01_RDD_reduce_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.reduce(_ + _)println(i)// 10// 分区数据:[1, 2], [3, 4]// reduce聚集分区的所有元素:先聚合分区内的数据,再聚合分区间的数据// [1, 2]=>3, [3, 4]=>7    3 + 7 => 10}
}

2. collect

函数源码:

def collect(): Array[T] = withScope {val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)Array.concat(results: _*)}

函数说明:

在驱动程序中,以数组Array的形式返回数据集的所有元素。

object Spark02_RDD_collect_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val mapRDD = rdd.map(_ * 2)// 代码运行到collect时才开始触发执行任务。在这之前只是在driver构建一个逻辑的执行计划。// collect源码存在runJob函数。println(mapRDD.collect().mkString(","))// 将executor端的分区内的数据按分区有序的生成一个数组并返回到driver端// 调用collect函数的输出:2,4,6,8mapRDD.foreach(println)// 不调用collect函数的输出:// 2    6     8     4无序}
}

3. count 和 first

函数源码:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sumdef first(): T = withScope {take(1) match {case Array(t) => tcase _ => throw new UnsupportedOperationException("empty collection")}}

first函数底层调用了take函数,take函数底层调用了runJob函数,所以first也是行动算子。

函数说明:

object Spark03_RDD_count_first_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)// count函数获取rdd的数据的个数val cnt = rdd.count()// 4println(cnt)// first获取数据源中数据的第一个元素val first = rdd.first()println(first)// 1sc.stop()}
}

4. take和takeOrdered

函数源码:

def take(num: Int): Array[T] = withScope {val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)if (num == 0) {new Array[T](0)} else {val buf = new ArrayBuffer[T]val totalParts = this.partitions.lengthvar partsScanned = 0while (buf.size < num && partsScanned < totalParts) {// The number of partitions to try in this iteration. It is ok for this number to be// greater than totalParts because we actually cap it at totalParts in runJob.var numPartsToTry = 1Lval left = num - buf.sizeif (partsScanned > 0) {// If we didn't find any rows after the previous iteration, quadruple and retry.// Otherwise, interpolate the number of partitions we need to try, but overestimate// it by 50%. We also cap the estimation in the end.if (buf.isEmpty) {numPartsToTry = partsScanned * scaleUpFactor} else {// As left > 0, numPartsToTry is always >= 1numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toIntnumPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)}}val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)res.foreach(buf ++= _.take(num - buf.size))partsScanned += p.size}buf.toArray}}

函数说明:

object Spark05_RDD_take_takeOrdered_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(4, 3, 2, 1, 6, 7), 3)println(rdd.take(3).mkString(" "))// 4, 3, 2// take方法返回的是一个数组// 从数据源取前N个数据println(rdd.takeOrdered(3).mkString(", "))// 1, 2, 3// takeOrdered方法返回的是一个有序数组,第二个参数可以传入排序规则。// 源码中的Ordering特质继承自Comparator[T],即相当于java中的比较器。// 从数据源获取前N个有序的数据sc.stop()}
}

5. aggregate

函数源码:

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {// Clone the zero value since we will also be serializing it as part of tasksvar jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())val cleanSeqOp = sc.clean(seqOp)val cleanCombOp = sc.clean(combOp)val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition, mergeResult)jobResult}

函数说明:

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。

object Spark04_RDD_aggregate_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)val i = rdd.aggregate(0)(_ + _, _ + _)println(i)// 10// 0 + 1 + 2 => 3, 0 + 3 + 4 => 7// 0 + 3 + 7 => 10val i1 = rdd.aggregate(10)(_ + _, _ + _)println(i1)// 40// 10 + 1 + 2 => 13,  10 + 3 + 4 => 17// 10 + 13 + 17 => 40sc.stop()}
}

6. fold

函数源码;

略。

函数说明:

折叠操作,aggregate的简化版操作。

即aggregate的分区内计算的逻辑和分区间计算的逻辑相同。

7. countByKey和countByValue

countByKey函数源码:

def countByKey(): Map[K, Long] = self.withScope {self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}

返回值类型是map集合。即Map[K, long]

K即是key-value的key类型,Long则是key出现的次数。

countByValue函数源码:

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope {map(value => (value, null)).countByKey()}

得知,底层调用了map方法,将rdd的每个数据映射为一个元组然后调用countByKey方法。

object Spark06_RDD_countByKey_countByValue_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 返回的是Map集合,key是key-value的key,value是key出现的countval rdd_key = rdd.countByKey()println(rdd_key)// Map(a -> 2, b -> 1, c -> 1)// a作为key出现了两次,b作为key出现了一次...val rdd_value = rdd.countByValue()println(rdd_value)// Map((a,2) -> 1, (b,2) -> 1, (c,3) -> 1, (a,1) -> 1)// 底层调用了map方法和countbyKey方法sc.stop()}
}

8. save相关算子

save相关算子包括

saveAsTextFile, saveAsObjectFile, saveAsSequenceFile

函数源码:

def saveAsTextFile(path: String): Unit = withScope {saveAsTextFile(path, null)}def saveAsObjectFile(path: String): Unit = withScope {this.mapPartitions(iter => iter.grouped(10).map(_.toArray)).map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))).saveAsSequenceFile(path)}def saveAsSequenceFile(path: String,codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {def anyToWritable[U <% Writable](u: U): Writable = u// TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and// valueWritableClass at the compile time. To implement that, we need to add type parameters to// SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a// breaking change.val convertKey = self.keyClass != _keyWritableClassval convertValue = self.valueClass != _valueWritableClasslogInfo("Saving as sequence file of type " +s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )val format = classOf[SequenceFileOutputFormat[Writable, Writable]]val jobConf = new JobConf(self.context.hadoopConfiguration)if (!convertKey && !convertValue) {self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (!convertKey && convertValue) {self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && !convertValue) {self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)} else if (convertKey && convertValue) {self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)}}
}

函数说明:

object Spark06_RDD_save_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)))// 保存在文件中rdd.saveAsTextFile("datas")// 序列化成对象保存在文件中rdd.saveAsObjectFile("datas")// 要求数据的格式必须为key-value类型rdd.saveAsSequenceFile("datas")sc.stop()}
}

9. foreach

函数源码:

底层一直在调用runJob函数。

def foreach(f: T => Unit): Unit = withScope {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))}

函数说明:

分布式遍历RDD中的每一个元素,调用该函数。

object Spark07_RDD_foreach_action {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("rdd_action")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 2)), 2)rdd.collect().foreach(println)// collect会按分区建立的顺序把数据采集过来到driver端
//    (a,1)
//    (b,2)
//    (c,3)
//    (a,2)println("*************")// 而foreach直接在executor端内存数据的打印rdd.foreach(println)
//    (c,3)//(a,2)//(a,1)//(b,2)sc.stop()}
}

http://www.yayakq.cn/news/432336/

相关文章:

  • 网站制作 建站php网站开发什么
  • 辽宁自适应网站建设公司提高网站权重的作用
  • 电子商务网站建设的技术综述关于医疗保障局门户网站建设
  • jquery 的网站模板深圳网站建设者
  • 3g小说网站中国网站排名网官网
  • 泉州住房建设局网站给学校建网站那个系统好
  • 专业手机网站开发wordpress 直接连接数据库文件
  • 做网站域名后缀选择成都户外网站建设
  • 汕头seo建站wordpress购物模板
  • 精致的个人网站商务网站建设实训报告1600字
  • 自己建网站做淘宝客wordpress卢松松评论模板
  • 南京好的网站设计公司网站建立的方式是什么
  • 北京网站建设公司新闻网页界面设计的分类
  • 网站系统建设思想如何写公众号商城怎么开
  • 做网站源码需要多少钱国外的app设计网站
  • 做网络推广网站有哪些湖南株洲静默
  • 网站建设先进材料wordpress主题调度
  • 惠州网站建设找哪个公司房地产家居网络平台
  • 自学网站免费上海高端网站建设
  • 个人网站名字取名怎么做网站建设与管理试题一
  • 购物网站建设比较好的wordpress 创业
  • 电子商务官方网站东莞网站建设哪里好
  • 网站源码带采集怎样开通微信公众号平台
  • 公司做个网站好还是做公众号好如何自己设计装修效果图
  • asp网站仿制网站一般用什么软件做的
  • 移动网站开发语言中国网重庆
  • 免费行情网站软件ipad 设计网站
  • 网站网络设计是怎么做的网页设计师学历要求
  • 建设网站企业网上银行登录入口美团网站开发形式
  • 做网站江西外贸建站行业好做吗