В API Java используйте локальный режим flink для использования тем Kafka и сохранения данных непосредственно в hdfs.
флинк версия 1.13
Кафка версия 0.8
Хадуп версии 3.1.4
Чтобы выполнить требование Flink по получению данных из Kafka и записи их в HDFS в реальном времени, обычно необходимо запустить следующие компоненты:
[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager
Убедитесь, что Zookeeper запущен, поскольку Kafka Consumer от Flink зависит от Zookeeper.
Убедитесь, что сервер Kafka работает, поскольку Kafka Consumer Flink должен подключиться к Kafka Broker.
Запустите JobManager и TaskManager Flink, которые являются основными компонентами для выполнения задач Flink.
Убедитесь, что эти компоненты работают, чтобы задание Flink могло правильно использовать данные из Kafka и записывать их в HDFS.
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1
kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1
Это все зависимости проекта, включая flink, spark, hbase, ck и т. д. Фактические требования не требуют всех зависимостей, и их можно загрузить с зеркальных станций с открытым исходным кодом Alibaba Cloud или Maven.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.6</flink.version>
<hbase.version>2.4.0</hbase.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.6</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.32</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.20</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.8</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<!--загрузитьизместныйjarиз Расположение-->
<fromFile>target/${project.build.finalName}.jar</fromFile>
<!--удаленная копияизадрес-->
<url>scp://root:root@hadoop10:/opt/app</url>
</configuration>
</plugin>
</plugins>
</build>
</project>
topic1
тема,Запишите данные непосредственно в hdf.import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Test9_kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop10:9092");
properties.setProperty("group.id", "test");
// Использование FlinkKafkaConsumer в качестве источника данных
DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));
String outputPath = "hdfs://hadoop10:8020/out240102";
// Запись данных в HDFS с помощью StreamingFileSink
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.build();
// Добавьте приемник для записи данных Kafka непосредственно в HDFS.
ds1.addSink(sink);
ds1.print();
env.execute("Flink Kafka HDFS");
}
}
Запустите код идеи, программа начнет выполняться, а консоль пуста, за исключением журнала. На рисунке ниже показан скриншот пользовательской консоли после получения данных от производителя.
Запустите продюсер и напишите данные. Данные не имеют ограничений по формату и могут быть заполнены как угодно. Данные, отправленные в это время, можно распечатать на экране консоли на ШАГЕ 1.
Проверьте соответствующий каталог в HDFS, и вы увидите, что данные записаны. Я создал здесь несколько незавершенных файлов, потому что несколько раз тестировал их и несколько раз запускал неработающий код. После того как IDE распечатается на экране, он записывается на HDFS диск. Проходит определенный период времени и нужно подождать. После обновления данных в HDFS можно увидеть процесс изменения размера файла от 0 до . записываемые данные.
package day2;
import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class Test9_kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop10:9092");
properties.setProperty("group.id", "test");
// Использование FlinkKafkaConsumer в качестве источника данных
DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));
String outputPath = "hdfs://hadoop10:8020/out240102";
// Запись данных в HDFS с помощью StreamingFileSink
StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.build();
// во временное окно НайсоданныеписатьHDFS ds1.process(new CustomProcessFunction()) // Использовать пользовательские ProcessFunction
.addSink(sink);
// Выполнить программу
env.execute("Flink Kafka HDFS");
}
}
package day2;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class CustomProcessFunction extends ProcessFunction<String, String> {
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
// Здесь вы можете добавить конкретную логику, например запись данных в HDFS.
System.out.println(value); // Распечатать результаты на экране
out.collect(value);
}
}