python - pyspark Error Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema -
i have below pyspark code join 2 data frames. looks simple output not coming erro. unable proceed further, can please identify fundamental issue here?
input
c.csv
100,2015-09-03,sg,7 200,2016-01-30,at,9 300,2016-01-25,au,8 400,2016-01-22,au,7
u.csv
248,248,country,sg,singapore 66,66,country,at,austria 65,65,country,au,australia
output
100,singapore 200,austria 300,australia 400,australia
source
pyspark code :test.py
from pyspark import sparkconf, sparkcontext pyspark.sql.types import stringtype pyspark import sqlcontext conf = sparkconf().setappname("hybrid - read csv hive ") sc = sparkcontext(conf=conf) sqlcontext = sqlcontext(sc) c_rdd = sc.textfile("./hybrid/c.csv").map(lambda line: line.split(",")) r_rdd = sc.textfile("./hybrid/u.csv").map(lambda line: line.encode("ascii", "ignore").split(",")) c_df = c_rdd.todf(['c_no','op_dt','try_cd','lb']) r_df = r_rdd.todf(['c_id','p_id','cc_cd','c_nm','c_ds']) new = c_df.join(r_df, c_df.try_cd == r_df.c_nm).select(['c_no','c_ds']) new.show()
result
pyspark error: $spark-submit test.py java.lang.illegalstateexception: input row doesn't have expected number of values required schema. 5 fields required while 6 values provided. @ org.apache.spark.sql.execution.evaluatepython$.fromjava(python.scala:225) @ org.apache.spark.sql.sqlcontext$$anonfun$11.apply(sqlcontext.scala:933) @ org.apache.spark.sql.sqlcontext$$anonfun$11.apply(sqlcontext.scala:933) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ scala.collection.iterator$$anon$11.next(iterator.scala:328) @ org.apache.spark.shuffle.sort.bypassmergesortshufflewriter.write(bypassmergesortshufflewriter.java:149)
can please issue here?
hopefully using spark 2.x + try -
from pyspark.sql.types import structtype,stringtype,integertype,structfield pyspark.sql import sparksession spark = sparksession \ .builder \ .appname("hybrid - read csv hive ") \ .getorcreate() cschema = structtype([structfield("c_no", integertype()), structfield("op_dt", stringtype()), structfield("try_cd", stringtype()), structfield("lb", integertype())]) uschema = structtype([structfield("c_id", integertype()), structfield("p_id", integertype()), structfield("cc_cd", stringtype()), structfield("c_nm", stringtype()), structfield("c_ds", stringtype())]) c_df = spark.read.csv("c.csv",schema=cschema) u_df = spark.read.csv("u.csv",schema=uschema) new = c_df.join(u_df, c_df.try_cd == u_df.c_nm).select(c_df.c_no,u_df.c_ds) new.show()
Comments
Post a Comment