Skip to main content

Spark SelectDB Connector

Quick introduction

Spark SelectDB Connector supports writing large amounts of upstream data to SelectDB Cloud.

Implementation principle

The underlying implementation of Spark SelectDB Connector depends on the stage import method of SelectDB Cloud. By calling the SelectDB Cloud api (/copy/upload), a redirected object storage address is returned, and the byte stream is sent to the object storage address using http. Finally, through Copy into (/copyinto) to import the data in the object storage bucket to SelectDB Cloud. For the specific use of the stage import method, please refer to User Guide / Data Import / Stage Import.

Version support

ConnectorRuntime Jar
2.3.4-2.11-1.0spark-selectdb-connector-2.3.4_2.11-1.0
3.1.2-2.12-1.0spark-selectdb-connector-3.1.2_2.12-1.0
3.2.0-2.12-1.0spark-selectdb-connector-3.2.0_2.12-1.0

Notice:

How to use

Copy the downloaded jar package to Sparkthe ClassPathto use it spark-selectdb-connector. For example, if the Localmode is running Spark, put this jars/file under the folder. YarnIf it runs in cluster mode Spark, put this file in the pre-deployment package. For example, spark-selectdb-connector-2.3.4-2.11-1.0.-SNAPSHOT.jarupload to hdfs and add the Jar package path on hdfs to the spark.yarn.jars parameter

  • Upload spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jarto hdfs.
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jar /spark-jars/
  • spark-selectdb-connector-2.3.4-2.11-1.0-SNAPSHOT.jarAdd dependencies to the cluster .
spark.yarn.jars=hdfs:///spark-jars/doris-spark-connector-3.1.2-2.12-1.0.0.jar

If you want to reference it in the project, you can use mvn installthe method to install it to the local warehouse, and use the following method to add dependencies in maven.

<dependency>
<groupId>com.selectdb.spark</groupId>
<artifactId>spark-selectdb-connector-3.1_2.12</artifactId>
<version>1.0</version>
</dependency>

Example of use

Write through sparksql

val selectdbHttpPort = "127.0.0.1:47968"
val selectdbJdbc = "jdbc:mysql://127.0.0.1:18836/test"
val selectdbUser = "admin"
val selectdbPwd = "selectdb2022"
val selectdbTable = "test.test_order"

CREATE TEMPORARY VIEW test_order
USING selectdb
OPTIONS(
"table.identifier"="test.test_order",
"jdbc.url"="${selectdbJdbc}",
"http.port"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.file.type"="json"
);

insert into test_order select order_id,order_amount,order_status from tmp_tb ;

Write via DataFrame

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "Pending"),
("2", 200, null),
("3", 300, "received")
)).toDF("order_id", "order_amount", "order_status")

df.write
.format("selectdb")
.option("selectdb.http.port", selectdbHttpPort)
.option("selectdb.table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 4)
.option("sink.max-retries", 2)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()

Configuration

KeyDefaultValueCommentRequired
selectdb.http.port--SelectDB Cloud http addressAND
selectdb.jdbc.url--SelectDB Cloud jdbc address, this configuration belongs to spark sqlAND
selectdb.table.identifier--SelectDB Cloud table name, format library name. table name, for example: db1.tbl1AND
user--Username to access SelectDB CloudAND
password--Password to access SelectDB CloudAND
sink.batch.size100000The maximum number of rows written to SelectDB Cloud at a timeN
sink.max-retries3Number of retries after write SelectDB failsN
sink.properties.*--Import parameters for copy into. For example: "sink.properties.file.type"="json" For more parameter descriptions of copy into, please refer to the copy into section of the selectdb official website .