JustPaste.it

Error Log

                 STARTING DF 


>Generation DF
22/05/09 19:28:26 ERROR TaskSetManager: Task 26 in stage 6.0 failed 4 times; aborting job
An error occurred while calling o3857.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 6.0 failed 4 times, most recent failure: Lost task 26.3 in stage 6.0 (TID 197) (ff0283b06c39 executor 1): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file hdfs://0.0.0.0:9000/3/master/order/paid_20220324.parquet/part-00039-ff535f97-50b2-4d1b-9dd2-b119e4c6eda2-c000.snappy.parquet
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
        ... 27 more
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.Double ([B and java.lang.Double are in module java.base of loader 'bootstrap')
        at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116)
        at org.apache.spark.sql.catalyst.expressions.MutableDouble.update(SpecificInternalRow.scala:118)
        at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:248)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:174)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:92)
        at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
        at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
        at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
        ... 32 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.agg_doAggregateWithKeys_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file hdfs://0.0.0.0:9000/3/master/order/paid_20220324.parquet/part-00039-ff535f97-50b2-4d1b-9dd2-b119e4c6eda2-c000.snappy.parquet
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
        ... 27 more
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.Double ([B and java.lang.Double are in module java.base of loader 'bootstrap')
        at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116)
        at org.apache.spark.sql.catalyst.expressions.MutableDouble.update(SpecificInternalRow.scala:118)
        at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:248)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:174)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:92)
        at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
        at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
        at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
        ... 32 more

df_tmp_order_product empty
> END generation DF
22/05/09 19:28:35 ERROR TaskSetManager: Task 26 in stage 8.0 failed 4 times; aborting job
Traceback (most recent call last):
  File "/workspaces/batch/src/work/knn.py", line 60, in <module>
    df.show()
  File "/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 484, in show
  File "/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4003.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
SortAggregate(key=[cid#0, product_id#2], functions=[max(sid#1), max(buy_product#3), max(cart_product#4), max(view_product#5)], output=[cid#0, product_id#2, sid#528, buy_product#529, cart_product#530, view_product#531, rating#532])
+- *(15) Sort [cid#0 ASC NULLS FIRST, product_id#2 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(cid#0, product_id#2, 200), ENSURE_REQUIREMENTS, [id=#443]
      +- SortAggregate(key=[cid#0, product_id#2], functions=[partial_max(sid#1), partial_max(buy_product#3), partial_max(cart_product#4), partial_max(view_product#5)], output=[cid#0, product_id#2, max#548, max#549, max#550, max#551])
         +- *(14) Sort [cid#0 ASC NULLS FIRST, product_id#2 ASC NULLS FIRST], false, 0
            +- Union
               :- *(1) Filter isnotnull(product_id#2)
               :  +- *(1) Scan ExistingRDD[cid#0,sid#1,product_id#2,buy_product#3,cart_product#4,view_product#5]
               :- *(3) Project [cid#21, sid#12, col#136.product_id AS product_id#140, 5 AS buy_product#144, null AS cart_product#353, null AS view_product#354]
               :  +- *(3) Filter (isnotnull(col#136) AND isnotnull(col#136.product_id))
               :     +- Generate explode(items#31), [sid#12, cid#21], false, [col#136]
               :        +- *(2) Project [sid#12, cid#21, items#31]
               :           +- *(2) Filter (((isnotnull(is_bot#52) AND (is_bot#52 = false)) AND (size(items#31, true) > 0)) AND isnotnull(items#31))
               :              +- FileScan parquet [sid#12,cid#21,items#31,is_bot#52] Batched: false, DataFilters: [isnotnull(is_bot#52), (is_bot#52 = false), (size(items#31, true) > 0), isnotnull(items#31)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/order/paid_20220215.parquet, hdfs://0.0.0.0:9000/3..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(items)], ReadSchema: struct<sid:string,cid:string,items:array<struct<discount:double,discount_tax:double,price:double,...
               :- *(8) Project [coalesce(CASE WHEN (cid#159 = ) THEN null ELSE cid#159 END, cid_tmp_2#392) AS cid#412, sid#149, product_id#180, null AS buy_product#455, 2 AS cart_product#244, null AS view_product#456]
               :  +- *(8) Filter NOT (coalesce(CASE WHEN (cid#159 = ) THEN null ELSE cid#159 END, cid_tmp_2#392) = )
               :     +- *(8) BroadcastHashJoin [sid#149], [sid#12], LeftOuter, BuildRight, false
               :        :- *(8) Project [cid#159, sid#149, product_id#180]
               :        :  +- *(8) Filter ((isnotnull(is_bot#194) AND (is_bot#194 = false)) AND isnotnull(product_id#180))
               :        :     +- *(8) ColumnarToRow
               :        :        +- FileScan parquet [sid#149,cid#159,product_id#180,is_bot#194] Batched: true, DataFilters: [isnotnull(is_bot#194), (is_bot#194 = false), isnotnull(product_id#180)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/cart_20220304.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(product_id)], ReadSchema: struct<sid:string,cid:string,product_id:string,is_bot:boolean>
               :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [id=#324]
               :           +- Union
               :              :- *(5) Project [cid#21 AS cid_tmp_2#392, sid#12]
               :              :  +- Generate explode(items#31), [sid#12, cid#21], false, [col#136]
               :              :     +- *(4) Project [sid#12, cid#21, items#31]
               :              :        +- *(4) Filter (((((((isnotnull(is_bot#52) AND isnotnull(cid#21)) AND isnotnull(sid#12)) AND (is_bot#52 = false)) AND NOT (cid#21 = )) AND NOT (sid#12 = )) AND (size(items#31, true) > 0)) AND isnotnull(items#31))
               :              :           +- FileScan parquet [sid#12,cid#21,items#31,is_bot#52] Batched: false, DataFilters: [isnotnull(is_bot#52), isnotnull(cid#21), isnotnull(sid#12), (is_bot#52 = false), NOT (cid#21 = )..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/order/paid_20220215.parquet, hdfs://0.0.0.0:9000/3..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,items:array<struct<discount:double,discount_tax:double,price:double,...
               :              :- *(6) Project [cid#159 AS cid_tmp_2#540, sid#149]
               :              :  +- *(6) Filter (((((isnotnull(is_bot#194) AND isnotnull(cid#159)) AND isnotnull(sid#149)) AND (is_bot#194 = false)) AND NOT (cid#159 = )) AND NOT (sid#149 = ))
               :              :     +- *(6) ColumnarToRow
               :              :        +- FileScan parquet [sid#149,cid#159,is_bot#194] Batched: true, DataFilters: [isnotnull(is_bot#194), isnotnull(cid#159), isnotnull(sid#149), (is_bot#194 = false), NOT (cid#15..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/cart_20220304.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,is_bot:boolean>
               :              +- *(7) Project [cid#259 AS cid_tmp_2#541, sid#249]
               :                 +- *(7) Filter (((((isnotnull(is_bot#294) AND isnotnull(cid#259)) AND isnotnull(sid#249)) AND (is_bot#294 = false)) AND NOT (cid#259 = )) AND NOT (sid#249 = ))
               :                    +- *(7) ColumnarToRow
               :                       +- FileScan parquet [sid#249,cid#259,is_bot#294] Batched: true, DataFilters: [isnotnull(is_bot#294), isnotnull(cid#259), isnotnull(sid#249), (is_bot#294 = false), NOT (cid#25..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/view_20220328.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,is_bot:boolean>
               +- *(13) Project [coalesce(CASE WHEN (cid#259 = ) THEN null ELSE cid#259 END, cid_tmp_2#392) AS cid#440, sid#249, product_id#280, null AS buy_product#467, null AS cart_product#468, 1 AS view_product#344]
                  +- *(13) Filter NOT (coalesce(CASE WHEN (cid#259 = ) THEN null ELSE cid#259 END, cid_tmp_2#392) = )
                     +- *(13) BroadcastHashJoin [sid#249], [sid#12], LeftOuter, BuildRight, false
                        :- *(13) Project [cid#259, sid#249, product_id#280]
                        :  +- *(13) Filter ((isnotnull(is_bot#294) AND (is_bot#294 = false)) AND isnotnull(product_id#280))
                        :     +- *(13) ColumnarToRow
                        :        +- FileScan parquet [sid#249,cid#259,product_id#280,is_bot#294] Batched: true, DataFilters: [isnotnull(is_bot#294), (is_bot#294 = false), isnotnull(product_id#280)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/view_20220328.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(product_id)], ReadSchema: struct<sid:string,cid:string,product_id:string,is_bot:boolean>
                        +- ReusedExchange [cid_tmp_2#392, sid#12], BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [id=#324]

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.execution.aggregate.SortAggregateExec.doExecute(SortAggregateExec.scala:54)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:321)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:439)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(cid#0, product_id#2, 200), ENSURE_REQUIREMENTS, [id=#443]
+- SortAggregate(key=[cid#0, product_id#2], functions=[partial_max(sid#1), partial_max(buy_product#3), partial_max(cart_product#4), partial_max(view_product#5)], output=[cid#0, product_id#2, max#548, max#549, max#550, max#551])
   +- *(14) Sort [cid#0 ASC NULLS FIRST, product_id#2 ASC NULLS FIRST], false, 0
      +- Union
         :- *(1) Filter isnotnull(product_id#2)
         :  +- *(1) Scan ExistingRDD[cid#0,sid#1,product_id#2,buy_product#3,cart_product#4,view_product#5]
         :- *(3) Project [cid#21, sid#12, col#136.product_id AS product_id#140, 5 AS buy_product#144, null AS cart_product#353, null AS view_product#354]
         :  +- *(3) Filter (isnotnull(col#136) AND isnotnull(col#136.product_id))
         :     +- Generate explode(items#31), [sid#12, cid#21], false, [col#136]
         :        +- *(2) Project [sid#12, cid#21, items#31]
         :           +- *(2) Filter (((isnotnull(is_bot#52) AND (is_bot#52 = false)) AND (size(items#31, true) > 0)) AND isnotnull(items#31))
         :              +- FileScan parquet [sid#12,cid#21,items#31,is_bot#52] Batched: false, DataFilters: [isnotnull(is_bot#52), (is_bot#52 = false), (size(items#31, true) > 0), isnotnull(items#31)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/order/paid_20220215.parquet, hdfs://0.0.0.0:9000/3..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(items)], ReadSchema: struct<sid:string,cid:string,items:array<struct<discount:double,discount_tax:double,price:double,...
         :- *(8) Project [coalesce(CASE WHEN (cid#159 = ) THEN null ELSE cid#159 END, cid_tmp_2#392) AS cid#412, sid#149, product_id#180, null AS buy_product#455, 2 AS cart_product#244, null AS view_product#456]
         :  +- *(8) Filter NOT (coalesce(CASE WHEN (cid#159 = ) THEN null ELSE cid#159 END, cid_tmp_2#392) = )
         :     +- *(8) BroadcastHashJoin [sid#149], [sid#12], LeftOuter, BuildRight, false
         :        :- *(8) Project [cid#159, sid#149, product_id#180]
         :        :  +- *(8) Filter ((isnotnull(is_bot#194) AND (is_bot#194 = false)) AND isnotnull(product_id#180))
         :        :     +- *(8) ColumnarToRow
         :        :        +- FileScan parquet [sid#149,cid#159,product_id#180,is_bot#194] Batched: true, DataFilters: [isnotnull(is_bot#194), (is_bot#194 = false), isnotnull(product_id#180)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/cart_20220304.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(product_id)], ReadSchema: struct<sid:string,cid:string,product_id:string,is_bot:boolean>
         :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [id=#324]
         :           +- Union
         :              :- *(5) Project [cid#21 AS cid_tmp_2#392, sid#12]
         :              :  +- Generate explode(items#31), [sid#12, cid#21], false, [col#136]
         :              :     +- *(4) Project [sid#12, cid#21, items#31]
         :              :        +- *(4) Filter (((((((isnotnull(is_bot#52) AND isnotnull(cid#21)) AND isnotnull(sid#12)) AND (is_bot#52 = false)) AND NOT (cid#21 = )) AND NOT (sid#12 = )) AND (size(items#31, true) > 0)) AND isnotnull(items#31))
         :              :           +- FileScan parquet [sid#12,cid#21,items#31,is_bot#52] Batched: false, DataFilters: [isnotnull(is_bot#52), isnotnull(cid#21), isnotnull(sid#12), (is_bot#52 = false), NOT (cid#21 = )..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/order/paid_20220215.parquet, hdfs://0.0.0.0:9000/3..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,items:array<struct<discount:double,discount_tax:double,price:double,...
         :              :- *(6) Project [cid#159 AS cid_tmp_2#540, sid#149]
         :              :  +- *(6) Filter (((((isnotnull(is_bot#194) AND isnotnull(cid#159)) AND isnotnull(sid#149)) AND (is_bot#194 = false)) AND NOT (cid#159 = )) AND NOT (sid#149 = ))
         :              :     +- *(6) ColumnarToRow
         :              :        +- FileScan parquet [sid#149,cid#159,is_bot#194] Batched: true, DataFilters: [isnotnull(is_bot#194), isnotnull(cid#159), isnotnull(sid#149), (is_bot#194 = false), NOT (cid#15..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/cart_20220304.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,is_bot:boolean>
         :              +- *(7) Project [cid#259 AS cid_tmp_2#541, sid#249]
         :                 +- *(7) Filter (((((isnotnull(is_bot#294) AND isnotnull(cid#259)) AND isnotnull(sid#249)) AND (is_bot#294 = false)) AND NOT (cid#259 = )) AND NOT (sid#249 = ))
         :                    +- *(7) ColumnarToRow
         :                       +- FileScan parquet [sid#249,cid#259,is_bot#294] Batched: true, DataFilters: [isnotnull(is_bot#294), isnotnull(cid#259), isnotnull(sid#249), (is_bot#294 = false), NOT (cid#25..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/view_20220328.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,is_bot:boolean>
         +- *(13) Project [coalesce(CASE WHEN (cid#259 = ) THEN null ELSE cid#259 END, cid_tmp_2#392) AS cid#440, sid#249, product_id#280, null AS buy_product#467, null AS cart_product#468, 1 AS view_product#344]
            +- *(13) Filter NOT (coalesce(CASE WHEN (cid#259 = ) THEN null ELSE cid#259 END, cid_tmp_2#392) = )
               +- *(13) BroadcastHashJoin [sid#249], [sid#12], LeftOuter, BuildRight, false
                  :- *(13) Project [cid#259, sid#249, product_id#280]
                  :  +- *(13) Filter ((isnotnull(is_bot#294) AND (is_bot#294 = false)) AND isnotnull(product_id#280))
                  :     +- *(13) ColumnarToRow
                  :        +- FileScan parquet [sid#249,cid#259,product_id#280,is_bot#294] Batched: true, DataFilters: [isnotnull(is_bot#294), (is_bot#294 = false), isnotnull(product_id#280)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/view_20220328.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(product_id)], ReadSchema: struct<sid:string,cid:string,product_id:string,is_bot:boolean>
                  +- ReusedExchange [cid_tmp_2#392, sid#12], BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [id=#324]

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:163)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
        at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
        at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
        at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:132)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$1(SortAggregateExec.scala:56)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 34 more
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
SortAggregate(key=[cid#0, product_id#2], functions=[partial_max(sid#1), partial_max(buy_product#3), partial_max(cart_product#4), partial_max(view_product#5)], output=[cid#0, product_id#2, max#548, max#549, max#550, max#551])
+- *(14) Sort [cid#0 ASC NULLS FIRST, product_id#2 ASC NULLS FIRST], false, 0
   +- Union
      :- *(1) Filter isnotnull(product_id#2)
      :  +- *(1) Scan ExistingRDD[cid#0,sid#1,product_id#2,buy_product#3,cart_product#4,view_product#5]
      :- *(3) Project [cid#21, sid#12, col#136.product_id AS product_id#140, 5 AS buy_product#144, null AS cart_product#353, null AS view_product#354]
      :  +- *(3) Filter (isnotnull(col#136) AND isnotnull(col#136.product_id))
      :     +- Generate explode(items#31), [sid#12, cid#21], false, [col#136]
      :        +- *(2) Project [sid#12, cid#21, items#31]
      :           +- *(2) Filter (((isnotnull(is_bot#52) AND (is_bot#52 = false)) AND (size(items#31, true) > 0)) AND isnotnull(items#31))
      :              +- FileScan parquet [sid#12,cid#21,items#31,is_bot#52] Batched: false, DataFilters: [isnotnull(is_bot#52), (is_bot#52 = false), (size(items#31, true) > 0), isnotnull(items#31)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/order/paid_20220215.parquet, hdfs://0.0.0.0:9000/3..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(items)], ReadSchema: struct<sid:string,cid:string,items:array<struct<discount:double,discount_tax:double,price:double,...
      :- *(8) Project [coalesce(CASE WHEN (cid#159 = ) THEN null ELSE cid#159 END, cid_tmp_2#392) AS cid#412, sid#149, product_id#180, null AS buy_product#455, 2 AS cart_product#244, null AS view_product#456]
      :  +- *(8) Filter NOT (coalesce(CASE WHEN (cid#159 = ) THEN null ELSE cid#159 END, cid_tmp_2#392) = )
      :     +- *(8) BroadcastHashJoin [sid#149], [sid#12], LeftOuter, BuildRight, false
      :        :- *(8) Project [cid#159, sid#149, product_id#180]
      :        :  +- *(8) Filter ((isnotnull(is_bot#194) AND (is_bot#194 = false)) AND isnotnull(product_id#180))
      :        :     +- *(8) ColumnarToRow
      :        :        +- FileScan parquet [sid#149,cid#159,product_id#180,is_bot#194] Batched: true, DataFilters: [isnotnull(is_bot#194), (is_bot#194 = false), isnotnull(product_id#180)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/cart_20220304.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(product_id)], ReadSchema: struct<sid:string,cid:string,product_id:string,is_bot:boolean>
      :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [id=#324]
      :           +- Union
      :              :- *(5) Project [cid#21 AS cid_tmp_2#392, sid#12]
      :              :  +- Generate explode(items#31), [sid#12, cid#21], false, [col#136]
      :              :     +- *(4) Project [sid#12, cid#21, items#31]
      :              :        +- *(4) Filter (((((((isnotnull(is_bot#52) AND isnotnull(cid#21)) AND isnotnull(sid#12)) AND (is_bot#52 = false)) AND NOT (cid#21 = )) AND NOT (sid#12 = )) AND (size(items#31, true) > 0)) AND isnotnull(items#31))
      :              :           +- FileScan parquet [sid#12,cid#21,items#31,is_bot#52] Batched: false, DataFilters: [isnotnull(is_bot#52), isnotnull(cid#21), isnotnull(sid#12), (is_bot#52 = false), NOT (cid#21 = )..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/order/paid_20220215.parquet, hdfs://0.0.0.0:9000/3..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,items:array<struct<discount:double,discount_tax:double,price:double,...
      :              :- *(6) Project [cid#159 AS cid_tmp_2#540, sid#149]
      :              :  +- *(6) Filter (((((isnotnull(is_bot#194) AND isnotnull(cid#159)) AND isnotnull(sid#149)) AND (is_bot#194 = false)) AND NOT (cid#159 = )) AND NOT (sid#149 = ))
      :              :     +- *(6) ColumnarToRow
      :              :        +- FileScan parquet [sid#149,cid#159,is_bot#194] Batched: true, DataFilters: [isnotnull(is_bot#194), isnotnull(cid#159), isnotnull(sid#149), (is_bot#194 = false), NOT (cid#15..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/cart_20220304.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,is_bot:boolean>
      :              +- *(7) Project [cid#259 AS cid_tmp_2#541, sid#249]
      :                 +- *(7) Filter (((((isnotnull(is_bot#294) AND isnotnull(cid#259)) AND isnotnull(sid#249)) AND (is_bot#294 = false)) AND NOT (cid#259 = )) AND NOT (sid#249 = ))
      :                    +- *(7) ColumnarToRow
      :                       +- FileScan parquet [sid#249,cid#259,is_bot#294] Batched: true, DataFilters: [isnotnull(is_bot#294), isnotnull(cid#259), isnotnull(sid#249), (is_bot#294 = false), NOT (cid#25..., Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/view_20220328.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), IsNotNull(cid), IsNotNull(sid), EqualTo(is_bot,false), Not(EqualTo(cid,)), No..., ReadSchema: struct<sid:string,cid:string,is_bot:boolean>
      +- *(13) Project [coalesce(CASE WHEN (cid#259 = ) THEN null ELSE cid#259 END, cid_tmp_2#392) AS cid#440, sid#249, product_id#280, null AS buy_product#467, null AS cart_product#468, 1 AS view_product#344]
         +- *(13) Filter NOT (coalesce(CASE WHEN (cid#259 = ) THEN null ELSE cid#259 END, cid_tmp_2#392) = )
            +- *(13) BroadcastHashJoin [sid#249], [sid#12], LeftOuter, BuildRight, false
               :- *(13) Project [cid#259, sid#249, product_id#280]
               :  +- *(13) Filter ((isnotnull(is_bot#294) AND (is_bot#294 = false)) AND isnotnull(product_id#280))
               :     +- *(13) ColumnarToRow
               :        +- FileScan parquet [sid#249,cid#259,product_id#280,is_bot#294] Batched: true, DataFilters: [isnotnull(is_bot#294), (is_bot#294 = false), isnotnull(product_id#280)], Format: Parquet, Location: InMemoryFileIndex[hdfs://0.0.0.0:9000/3/master/product/view_20220328.parquet, hdfs://0.0.0.0:9000..., PartitionFilters: [], PushedFilters: [IsNotNull(is_bot), EqualTo(is_bot,false), IsNotNull(product_id)], ReadSchema: struct<sid:string,cid:string,product_id:string,is_bot:boolean>
               +- ReusedExchange [cid_tmp_2#392, sid#12], BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]),false), [id=#324]

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.execution.aggregate.SortAggregateExec.doExecute(SortAggregateExec.scala:54)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:118)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:118)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:151)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:149)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.$anonfun$doExecute$1(ShuffleExchangeExec.scala:166)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 53 more
Caused by: java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 8.0 failed 4 times, most recent failure: Lost task 26.3 in stage 8.0 (TID 231) (ff0283b06c39 executor 1): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file hdfs://0.0.0.0:9000/3/master/order/paid_20220324.parquet/part-00039-ff535f97-50b2-4d1b-9dd2-b119e4c6eda2-c000.snappy.parquet
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
        ... 25 more
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.Double ([B and java.lang.Double are in module java.base of loader 'bootstrap')
        at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116)
        at org.apache.spark.sql.catalyst.expressions.MutableDouble.update(SpecificInternalRow.scala:118)
        at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:248)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:174)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:92)
        at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
        at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
        at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
        ... 30 more

Driver stacktrace:
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:194)
        at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:515)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:193)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:189)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:203)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareRelation(BroadcastHashJoinExec.scala:217)
        at org.apache.spark.sql.execution.joins.HashJoin.codegenOuter(HashJoin.scala:497)
        at org.apache.spark.sql.execution.joins.HashJoin.codegenOuter$(HashJoin.scala:496)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:40)
        at org.apache.spark.sql.execution.joins.HashJoin.doConsume(HashJoin.scala:352)
        at org.apache.spark.sql.execution.joins.HashJoin.doConsume$(HashJoin.scala:349)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:40)
        at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
        at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
        at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:87)
        at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
        at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
        at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:113)
        at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:238)
        at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
        at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:149)
        at org.apache.spark.sql.execution.ColumnarToRowExec.consume(Columnar.scala:66)
        at org.apache.spark.sql.execution.ColumnarToRowExec.doProduce(Columnar.scala:191)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.ColumnarToRowExec.produce(Columnar.scala:66)
        at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:153)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:113)
        at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.joins.HashJoin.doProduce(HashJoin.scala:346)
        at org.apache.spark.sql.execution.joins.HashJoin.doProduce$(HashJoin.scala:345)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:40)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:40)
        at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:153)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:113)
        at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:54)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:95)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:90)
        at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:41)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:655)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:718)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.UnionExec.$anonfun$doExecute$5(basicPhysicalOperators.scala:667)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.execution.UnionExec.doExecute(basicPhysicalOperators.scala:667)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.InputAdapter.inputRDD(WholeStageCodegenExec.scala:525)
        at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs(WholeStageCodegenExec.scala:453)
        at org.apache.spark.sql.execution.InputRDDCodegen.inputRDDs$(WholeStageCodegenExec.scala:452)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:496)
        at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:132)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:746)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.aggregate.SortAggregateExec.$anonfun$doExecute$1(SortAggregateExec.scala:56)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 65 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 8.0 failed 4 times, most recent failure: Lost task 26.3 in stage 8.0 (TID 231) (ff0283b06c39 executor 1): org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file hdfs://0.0.0.0:9000/3/master/order/paid_20220324.parquet/part-00039-ff535f97-50b2-4d1b-9dd2-b119e4c6eda2-c000.snappy.parquet
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
        ... 25 more
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.Double ([B and java.lang.Double are in module java.base of loader 'bootstrap')
        at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116)
        at org.apache.spark.sql.catalyst.expressions.MutableDouble.update(SpecificInternalRow.scala:118)
        at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:248)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:174)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:92)
        at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
        at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
        at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
        ... 30 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:397)
        at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.$anonfun$relationFuture$1(BroadcastExchangeExec.scala:118)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:185)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
Caused by: org.apache.spark.sql.execution.QueryExecutionException: Encounter error while reading parquet files. One possible cause: Parquet column cannot be converted in the corresponding files. Details: 
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:187)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        ... 3 more
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file hdfs://0.0.0.0:9000/3/master/order/paid_20220324.parquet/part-00039-ff535f97-50b2-4d1b-9dd2-b119e4c6eda2-c000.snappy.parquet
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
        at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
        ... 25 more
Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.Double ([B and java.lang.Double are in module java.base of loader 'bootstrap')
        at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:116)
        at org.apache.spark.sql.catalyst.expressions.MutableDouble.update(SpecificInternalRow.scala:118)
        at org.apache.spark.sql.catalyst.expressions.SpecificInternalRow.update(SpecificInternalRow.scala:248)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetRowConverter$RowUpdater.set(ParquetRowConverter.scala:174)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetPrimitiveConverter.addBinary(ParquetRowConverter.scala:92)
        at org.apache.parquet.column.impl.ColumnReaderImpl$2$6.writeValue(ColumnReaderImpl.java:317)
        at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:367)
        at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:226)
        ... 30 more