示例代码:
import static org.apache.spark.sql.functions.col;import static org.apache.spark.sql.functions.split;import static org.apache.spark.sql.functions.explode;import java.util.ArrayList;import java.util.List;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;public class TestSparkSqlSplit { public static void main(String[] args){ SparkSession sparkSession =SparkSession.builder().appName("test").master("local[*]").getOrCreate(); Listitems=new ArrayList (); MyEntity myEntity=new MyEntity(); myEntity.setId("scene_id1,scene_name1;scene_id2,scene_name2|id1"); myEntity.setName("name"); myEntity.setFields("other"); items.add(myEntity); sparkSession.createDataFrame(items, MyEntity.class).createOrReplaceTempView("test"); Dataset rows=sparkSession.sql("select * from test"); rows = rows.withColumn("id", explode(split(split(col("id"), "\\|").getItem(0), ";"))); rows=rows.withColumn("id1",split(rows.col("id"),",").getItem(0)) .withColumn("name1",split(rows.col("id"),",").getItem(1)); rows=rows.withColumn("id",rows.col("id1")) .withColumn("name",rows.col("name1")); rows=rows.drop("id1","name1"); rows.show(); sparkSession.stop(); }}
MyEntity.java
import java.io.Serializable;public class MyEntity implements Serializable{ private String id; private String name; private String fields; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getFields() { return fields; } public void setFields(String fields) { this.fields = fields; } }
打印结果:
18/12/05 17:28:53 INFO codegen.CodeGenerator: Code generated in 36.359731 ms+------+---------+-----------+|fields| id| name|+------+---------+-----------+| other|scene_id1|scene_name1|| other|scene_id2|scene_name2|+------+---------+-----------+