Seatunnel
очень прост в использовании,высокая производительность、поддерживатьпрямая трансляция
иАвтономная пакетная обработка
Продукты для массовой обработки данных,Структурировано наApache Spark
и Apache Flink
выше,Адрес проекта с открытым исходным кодом: https://github.com/apache/incubator-seatunnel.
Seatunnel ранее был известен как Waterdrop.,После смены названия он был официально преобразован в проект Apache.,При этом два названия также соответствуют разным версиям.,Waterdrop относится к версии 1.x.,Seatunnel относится к версии 2.x.,Для 1.xи2.x имеются следующие отличия:
Ключевые функции | 1.x | 2.x |
---|---|---|
поддержка искры | yes | yes |
Поддержка | no | yes |
Основной язык разработки | scala | java |
Основные инструменты сборки | sbt | maven |
Apache Spark и Apache Flink — отличное достижение для распределенной и потоковой обработки данных.,Однако высокий порог использования требует от персонала, занимающегося обработкой данных, изучения сложного рабочего механизма и API Sparkiflink, чтобы использовать его более плавно.,Чтобы снизить порог обработки данных,И сделайте Sparkiflink проще в использовании,Сократите затраты на обучение,Ускорьте внедрение распределенной обработки данных в производственных средах,Появился Seatunnel.
Ключевые функции | Seatunnel | FlinkX | StreamX | DataX |
---|---|---|---|---|
Поддерживает ли искра | yes | no | yes | no |
Поддерживает ли флинк | да, совместимость с высокими версиями не очень хорошая | да, совместимость с высокими версиями не очень хорошая | да, более высокие версии имеют хорошую совместимость | no |
Сложность развертывания | легкий | середина | сложнее | легкий |
Сравнение основных функций | etl, синхронизация данных | Синхронизация данных | Визуальное развертывание задачи Flink | Синхронизация данных |
входить
-> Конвертировать
-> выход
,Для более сложной обработки данных,По сути, это также комбинация следующих типов поведения:public interface Plugin<T> extends Serializable {
// Ключ файла конфигурации
String RESULT_TABLE_NAME = "result_table_name";
String SOURCE_TABLE_NAME = "source_table_name";
// Установите конфигурацию для каждого плагина
void setConfig(Config config);
// Получить конфигурацию плагина
Config getConfig();
// Проверка конфигурации
CheckResult checkConfig();
// Подготовка перед подключением
void prepare(T prepareEnv);
}
trait BaseSparkSource[Data] extends BaseSource[SparkEnvironment] {
protected var config: Config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = this.config = config
override def getConfig: Config = config
def getData(env: SparkEnvironment): Data;
}
trait BaseSparkTransform extends BaseTransform[SparkEnvironment] {
protected var config: Config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = this.config = config
override def getConfig: Config = config
def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row];
}
trait BaseSparkSink[OUT] extends BaseSink[SparkEnvironment] {
protected var config: Config = ConfigFactory.empty()
override def setConfig(config: Config): Unit = this.config = config
override def getConfig: Config = config
def output(data: Dataset[Row], env: SparkEnvironment): OUT;
}
trait SparkStreamingSource[T] extends BaseSparkSource[DStream[T]] {
def beforeOutput(): Unit = {}
def afterOutput(): Unit = {}
def rdd2dataset(sparkSession: SparkSession, rdd: RDD[T]): Dataset[Row]
def start(env: SparkEnvironment, handler: Dataset[Row] => Unit): Unit = {
getData(env).foreachRDD(rdd => {
val dataset = rdd2dataset(env.getSparkSession, rdd)
handler(dataset)
})
}
}
public interface FlinkBatchSource<T> extends BaseFlinkSource {
DataSet<T> getData(FlinkEnvironment env);
}
public interface FlinkBatchTransform<IN, OUT> extends BaseFlinkTransform {
DataSet<OUT> processBatch(FlinkEnvironment env, DataSet<IN> data);
}
public interface FlinkBatchSink<IN, OUT> extends BaseFlinkSink {
DataSink<OUT> outputBatch(FlinkEnvironment env, DataSet<IN> inDataSet);
}
public interface FlinkStreamSource<T> extends BaseFlinkSource {
DataStream<T> getData(FlinkEnvironment env);
}
public interface FlinkStreamTransform<IN, OUT> extends BaseFlinkTransform {
DataStream<OUT> processStream(FlinkEnvironment env, DataStream<IN> dataStream);
}
public interface FlinkStreamSink<IN, OUT> extends BaseFlinkSink {
DataStreamSink<OUT> outputStream(FlinkEnvironment env, DataStream<IN> dataStream);
}
Полное имя SPI Сервис Provider Interface,Это набор интерфейсов, предоставляемых Java, которые реализованы или расширены третьими сторонами.,Его можно использовать для включения расширений платформы и замены компонентов.,Роль SPI — найти реализации сервисов для этих расширенных API.
API-(Application Programming Interface
)большую часть времени,Всеразработчик
Разработать интерфейс и завершить его реализацию,звонящий
Полагайтесь только на вызовы интерфейса,и не имеет права выбирать другую реализацию. С точки зрения пользователей,API используется непосредственно разработчиками приложений.,SPI-(Service Provider Interface
)дазвонящий
разработать спецификации интерфейса,Предоставляется сторонним сторонам для реализациизвонящий
Выберите нужную вам внешнюю реализацию。С точки зрения пользователей,SPI Используется расширителями фреймворка
package com.tyrantlucifer;
public interface Animal {
void shut();
}
package com.tyrantlucifer;
import java.util.ServiceLoader;
public class Main {
public static void main(String[] args) {
ServiceLoader<Animal> services = ServiceLoader.load(Animal.class);
for (Animal service : services) {
service.shut();
}
}
}
package com.tyrantlucifer;
public class Cat implements Animal {
public void shut() {
System.out.println("cat shut miao miao!!!");
}
}
package com.tyrantlucifer;
public class Dog implements Animal{
public void shut() {
System.out.println("dog shut wang wang!!!");
}
}
Зарегистрировать Спи,Вам необходимо создать новый файл с полным именем класса интерфейса в разделе resources/META-INF/services.,Например, наш интерфейс на этот разcom.tyrantlucifer.Animal
,Затем создайте новыйcom.tyrantlucifer.Animal
документ,И добавьте в файл свой собственный класс реализации:
com.tyrantlucifer.Cat
com.tyrantlucifer.Dog
spark {
spark.streaming.batchDuration = 5
spark.app.name = "seatunnel"
spark.ui.port = 13000
}
input {
socketStream {}
}
filter {
split {
fields = ["msg", "name"]
delimiter = ","
}
}
output {
stdout {}
}
env {
execution.parallelism = 1
}
source {
SocketStream{
result_table_name = "fake"
field_name = "info"
}
}
transform {
Split{
separator = "#"
fields = ["name","age"]
}
sql {
sql = "select * from (select info,split(info) as info_row from fake) t1"
}
}
sink {
ConsoleSink {}
}
class MyStdout extends BaseOutput {
var config: Config = ConfigFactory.empty()
/**
* Set Config.
* */
override def setConfig(config: Config): Unit = {
this.config = config
}
/**
* Get Config.
* */
override def getConfig(): Config = {
this.config
}
override def checkConfig(): (Boolean, String) = {
if (!config.hasPath("limit") || (config.hasPath("limit") && config.getInt("limit") >= -1)) {
(true, "")
} else {
(false, "please specify [limit] as Number[-1, " + Int.MaxValue + "]")
}
}
override def prepare(spark: SparkSession): Unit = {
super.prepare(spark)
val defaultConfig = ConfigFactory.parseMap(
Map(
"limit" -> 100,
"format" -> "plain" // plain | json | schema
)
)
config = config.withFallback(defaultConfig)
}
override def process(df: Dataset[Row]): Unit = {
val limit = config.getInt("limit")
var format = config.getString("format")
if (config.hasPath("serializer")) {
format = config.getString("serializer")
}
format match {
case "plain" => {
if (limit == -1) {
df.show(Int.MaxValue, false)
} else if (limit > 0) {
df.show(limit, false)
}
}
case "json" => {
if (limit == -1) {
df.toJSON.take(Int.MaxValue).foreach(s => println(s))
} else if (limit > 0) {
df.toJSON.take(limit).foreach(s => println(s))
}
}
case "schema" => {
df.printSchema()
}
}
}
}