python - pyspark Error Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema -


i have below pyspark code join 2 data frames. looks simple output not coming erro. unable proceed further, can please identify fundamental issue here?

input

c.csv

100,2015-09-03,sg,7 200,2016-01-30,at,9 300,2016-01-25,au,8 400,2016-01-22,au,7 

u.csv

248,248,country,sg,singapore 66,66,country,at,austria 65,65,country,au,australia 

output

100,singapore 200,austria 300,australia 400,australia 

source

pyspark code :test.py

from pyspark import sparkconf, sparkcontext pyspark.sql.types import stringtype pyspark import sqlcontext  conf = sparkconf().setappname("hybrid - read csv hive ") sc = sparkcontext(conf=conf)  sqlcontext = sqlcontext(sc) c_rdd = sc.textfile("./hybrid/c.csv").map(lambda line: line.split(",")) r_rdd = sc.textfile("./hybrid/u.csv").map(lambda line: line.encode("ascii", "ignore").split(","))  c_df = c_rdd.todf(['c_no','op_dt','try_cd','lb']) r_df = r_rdd.todf(['c_id','p_id','cc_cd','c_nm','c_ds'])  new = c_df.join(r_df, c_df.try_cd == r_df.c_nm).select(['c_no','c_ds']) new.show() 

result

pyspark error: $spark-submit  test.py java.lang.illegalstateexception: input row doesn't have expected number of values required schema. 5 fields required while 6 values provided.         @ org.apache.spark.sql.execution.evaluatepython$.fromjava(python.scala:225)         @ org.apache.spark.sql.sqlcontext$$anonfun$11.apply(sqlcontext.scala:933)         @ org.apache.spark.sql.sqlcontext$$anonfun$11.apply(sqlcontext.scala:933)         @ scala.collection.iterator$$anon$11.next(iterator.scala:328)         @ scala.collection.iterator$$anon$11.next(iterator.scala:328)         @ scala.collection.iterator$$anon$11.next(iterator.scala:328)         @ org.apache.spark.shuffle.sort.bypassmergesortshufflewriter.write(bypassmergesortshufflewriter.java:149) 

can please issue here?

hopefully using spark 2.x + try -

from pyspark.sql.types import structtype,stringtype,integertype,structfield pyspark.sql import sparksession  spark = sparksession \     .builder \     .appname("hybrid - read csv hive ") \     .getorcreate()  cschema = structtype([structfield("c_no", integertype()),                      structfield("op_dt", stringtype()),                      structfield("try_cd", stringtype()),                      structfield("lb", integertype())])  uschema = structtype([structfield("c_id", integertype()),                      structfield("p_id", integertype()),                      structfield("cc_cd", stringtype()),                      structfield("c_nm", stringtype()),                      structfield("c_ds", stringtype())])  c_df  = spark.read.csv("c.csv",schema=cschema) u_df  = spark.read.csv("u.csv",schema=uschema)  new = c_df.join(u_df, c_df.try_cd == u_df.c_nm).select(c_df.c_no,u_df.c_ds) new.show() 

Comments

Popular posts from this blog

php - Permission denied. Laravel linux server -

google bigquery - Delta between query execution time and Java query call to finish -

python - Pandas two dataframes multiplication? -