java - createDataFrame() throws exception when pass javaRDD that contains ArrayType column in SPARK 2.1 -
i want create dataframe(aka dataset<row> in spark 2.1) createdataframe () , works when pass list<row> param, throws exception when pass javardd<row> .
[code]
sparksession ss = sparksession.builder().appname("spark test").master("local[4]").getorcreate(); list<row> data = arrays.aslist( rowfactory.create(arrays.aslist("a", "b", "c")), rowfactory.create(arrays.aslist("a", "b", "c")) ); structtype schema = new structtype(new structfield[]{ datatypes.createstructfield("col_1", datatypes.createarraytype(datatypes.stringtype), false) });
when try code works well
ss.createdataframe(data, schema).show(); +---------+ | col_1| +---------+ |[a, b, c]| |[a, b, c]| +---------+
but when pass javardd 1st param throws exception
javardd<row> rdd = javasparkcontext.fromsparkcontext(ss.sparkcontext()).parallelize(data); ss.createdataframe(rdd, schema).show(); // throws exception
[exception]
exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task 2 in stage 1.0 failed 1 times, recent failure: lost task 2.0 in stage 1.0 (tid 3, localhost, executor driver): java.lang.runtimeexception: error while encoding: java.lang.runtimeexception: java.util.arrays$arraylist not valid external type schema of array<string> mapobjects(mapobjects_loopvalue0, mapobjects_loopisnull1, objecttype(class java.lang.object), staticinvoke(class org.apache.spark.unsafe.types.utf8string, stringtype, fromstring, validateexternaltype(lambdavariable(mapobjects_loopvalue0, mapobjects_loopisnull1, objecttype(class java.lang.object)), stringtype), true), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.row, true], top level row object), 0, col_1), arraytype(stringtype,true))) col_1#0 +- mapobjects(mapobjects_loopvalue0, mapobjects_loopisnull1, objecttype(class java.lang.object), staticinvoke(class org.apache.spark.unsafe.types.utf8string, stringtype, fromstring, validateexternaltype(lambdavariable(mapobjects_loopvalue0, mapobjects_loopisnull1, objecttype(class java.lang.object)), stringtype), true), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.row, true], top level row object), 0, col_1), arraytype(stringtype,true))) :- staticinvoke(class org.apache.spark.unsafe.types.utf8string, stringtype, fromstring, validateexternaltype(lambdavariable(mapobjects_loopvalue0, mapobjects_loopisnull1, objecttype(class java.lang.object)), stringtype), true) : +- validateexternaltype(lambdavariable(mapobjects_loopvalue0, mapobjects_loopisnull1, objecttype(class java.lang.object)), stringtype) : +- lambdavariable(mapobjects_loopvalue0, mapobjects_loopisnull1, objecttype(class java.lang.object)) +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.row, true], top level row object), 0, col_1), arraytype(stringtype,true)) +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.row, true], top level row object), 0, col_1) +- assertnotnull(input[0, org.apache.spark.sql.row, true], top level row object) +- input[0, org.apache.spark.sql.row, true] @ org.apache.spark.sql.catalyst.encoders.expressionencoder.torow(expressionencoder.scala:293) @ org.apache.spark.sql.sparksession$$anonfun$3.apply(sparksession.scala:547) @ org.apache.spark.sql.sparksession$$anonfun$3.apply(sparksession.scala:547) @ scala.collection.iterator$$anon$11.next(iterator.scala:409) @ scala.collection.iterator$$anon$11.next(iterator.scala:409) @ org.apache.spark.sql.execution.sparkplan$$anonfun$2.apply(sparkplan.scala:232) @ org.apache.spark.sql.execution.sparkplan$$anonfun$2.apply(sparkplan.scala:225) @ org.apache.spark.rdd.rdd$$anonfun$mappartitionsinternal$1$$anonfun$apply$25.apply(rdd.scala:826) @ org.apache.spark.rdd.rdd$$anonfun$mappartitionsinternal$1$$anonfun$apply$25.apply(rdd.scala:826) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:323) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:287) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:87) @ org.apache.spark.scheduler.task.run(task.scala:99) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:282) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) caused by: java.lang.runtimeexception: java.util.arrays$arraylist not valid external type schema of array<string> @ org.apache.spark.sql.catalyst.expressions.generatedclass$specificunsafeprojection.apply(unknown source) @ org.apache.spark.sql.catalyst.encoders.expressionencoder.torow(expressionencoder.scala:290) ... 17 more
any appreciated
Comments
Post a Comment