SparkRDD的collectaction不适用于单个elementsize过大的示例分析-成都创新互联网站建设

关于创新互联

多方位宣传企业产品与服务 突出企业形象

公司简介 公司的服务 荣誉资质 新闻动态 联系我们

SparkRDD的collectaction不适用于单个elementsize过大的示例分析

本篇文章为大家展示了Spark RDD的collect action 不适用于单个element size过大的示例分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

创新互联企业建站,十年网站建设经验,专注于网站建设技术,精于网页设计,有多年建站和网站代运营经验,设计师为客户打造网络企业风格,提供周到的建站售前咨询和贴心的售后服务。对于做网站、成都网站建设中不同领域进行深入了解和探索,创新互联在网站建设中充分了解客户行业的需求,以灵动的思维在网页中充分展现,通过对客户行业精准市场调研,为客户提供的解决方案。

collect是Spark RDD一个非常易用的action,通过collect可以轻易获得一个RDD当中所有的elements。当这些elements是String类型的时候,可以轻易将整个RDD转化成一个List,简直不要太好用。

不过等一等,这么好用的action有一个弱点,它不适合size比较的element。举个例子来说吧。请看下面这段代码:

... ...

JavaPairInputDStream messages = KafkaUtils.createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

topicsSet

);


JavaDStream lines = messages.map(new Function, String>() {

@Override

public String call(Tuple2 tuple2) {

return tuple2._2();

}

});

lines.foreachRDD(new Function, Void>(){

@Override

public Void call(JavaRDD strJavaRDD) throws Exception {

List messages = strJavaRDD.collect();

List sizeStrs = new ArrayList();

for (String message: messages) {

if (message== null)

continue;

String logStr = "message size is " + message.length();

strs.add(logStr);

}

saveToLog(outputLogPath, strs);

return null;

}

});

... ...

上述这段代码当Kafka中单个message(也就是)的size很小(比如200Bytes)的时候,运行得很好。可是当单个message size变大到一定程度(例如10MB),就会抛出以下异常:

sparkDriver-akka.actor.default-dispatcher-18 2015-10-15 21:52:28,606 ERROR JobSc

heduler - Error running job streaming job 1444971120000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 4 times, most recent failure: Lost task 0.3 in stage 238.0 (TID421, 127.0.0.1): ExecutorLostFailure (executor 123 lost)

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1215)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1204)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1203)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)

at scala.Option.foreach(Option.scala:236)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1404)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1365)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

原因很简单,collect()无法handle“大数据”。对于10MB size这样的单条message。我们可以用下面这段代码替代上面最后一部分:

lines.foreachRDD(new Function, Void>() {

@Override

public Void call(JavaRDD strJavaRDD) throws Exception {

JavaRDD sizeRDD = strJavaRDD.map(new Function() {

@Override

public String call(String message) throws Exception {

if (message == null)

return null;

String logStr = "Message size is " + message.length();

return logStr;

}

});

List sizeStrs = sizeRDD.collect();

saveToLog(outputLogPat, sizeStrs);

return null;

}

});

上述内容就是Spark RDD的collect action 不适用于单个element size过大的示例分析,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。


网站标题:SparkRDD的collectaction不适用于单个elementsize过大的示例分析
转载来源:http://kswsj.cn/article/gdopes.html

其他资讯