Flink — это платформа обработки потоков с открытым исходным кодом для вычислений с отслеживанием состояния неограниченных и ограниченных потоков данных. В кластере Flink разные роли берут на себя разные обязанности и работают вместе для выполнения задач по обработке данных. Ниже приведен подробный анализ основных ролей в кластере Flink:
Клиент:Код получается и конвертируется клиентом,Затем отправьте его в JobManger. TaskManager:Это реально“люди, которые работают”,Они несут ответственность за все операции по обработке данных. В Flink выполнением операторов и некоторой логикой обработки управляет TaskManager. JobManager:то естьFlinkкластервнутри“стюард”,Централизованно планирует и управляет заданиями и после этого получает задание для выполнения;,Конверсия будет обработана дальше.,Затем распределите задачи по многочисленным диспетчерам задач. На самом деле, JobManager является начальником,Потому что Flink — это механизм распределенной обработки.,Естественно, будет несколько узлов.,Тогда должно быть что-то для управления несколькими узлами.,JobManager отвечает за управление
Примечание. Flink — это очень гибкая платформа обработки. Она поддерживает множество различных сценариев развертывания (например, пряжи или самоуправление) и может легко интегрироваться с различными платформами управления ресурсами. Итак, далее мы дадим краткое введение, чтобы дать всем предварительное понимание, а затем подробно расскажем о применении Flink в различных ситуациях.
Роль Описание:
JobManager — это главный узел в кластере Flink, который играет роль менеджера кластера и отвечает за координацию и управление всем процессом выполнения заданий. Это основной процесс, контролирующий выполнение приложения. Каждое приложение должно контролироваться и выполняться уникальным JobManager.
Основные обязанности:
Высокая доступность:
Высокая доступность JobManager имеет решающее значение для стабильности и надежности всего кластера. В настройке высокой доступности может быть несколько JobManager, один из которых всегда является ведущим, а остальные — резервными.
Роль Описание:
TaskManager является рабочим узлом в кластере Flink и отвечает за фактическое выполнение заданий. За выполнение вычислений фактически отвечает работник, также известный как «работник».
Основные обязанности:
Управление ресурсами:
TaskManager управляет информацией о ресурсах на узле, где он расположен, например, память, диск, сеть и т. д., и сообщает о состоянии ресурса JobManager при его запуске. Чтобы изолировать ресурсы и увеличить количество разрешенных задач, TaskManager вводит концепцию слота. Каждый слот представляет собой поток, используемый для выполнения одного или нескольких операторов.
Роль Описание:
ResourceManager — это менеджер ресурсов в кластере Flink, отвечающий за унифицированное управление и распределение вычислительных ресурсов кластера.
Основные обязанности:
Роль Описание:
Диспетчер — это планировщик в кластере Flink, отвечающий за получение заданий, отправленных клиентами, и распределение заданий в JobManager для выполнения.
Основные обязанности:
Dispatcher также предоставляет интерфейс REST для отправки приложений Flink на выполнение и запуска нового JobMaster для каждого отправленного задания. Он также запускает Flink WebUI для предоставления информации о выполнении заданий.
Роль Описание:
Сервер Blob — это сервер распределения ресурсов в кластере Flink, отвечающий за управление и распределение зависимых ресурсов заданий.
Основные обязанности:
Роль Описание:
ZooKeeper — это служба координации в кластере Flink, которая отвечает за управление метаданными и информацией о состоянии в кластере.
Основные обязанности:
Роль Описание:
Клиент — это клиент, предоставленный программой Flink, и не является частью среды выполнения и выполнения программы.
Основные обязанности:
Кластер Flink обеспечивает эффективную и надежную обработку потока данных благодаря совместной работе нескольких ролей. Каждая роль предполагает определенные обязанности, которые в совокупности обеспечивают бесперебойное выполнение заданий и стабильную работу кластера.
узловой сервер | hadoop102 | hadoop103 | hadoop104 |
---|---|---|---|
Роль | JobManager TaskManager | TaskManager | TaskManager |
Разверните менеджер JobManager на 102 и разверните TaskManager на 102, 103 и 104.
Все информационные пакеты jar и информация для flink 1.17 находятся здесь. Если вы считаете, что баллы необоснованны, оставьте сообщение в области комментариев.
https://download.csdn.net/download/qq_51431069/89625288
https://download.csdn.net/download/qq_51431069/89625288
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/
vim /opt/module/flink-1.17.0/con/flink-conf.yaml
Измените следующее содержимое
# Адрес узла JobManager.
jobmanager.rpc.address: hadoop102
jobmanager.bind-host: 0.0.0.0
rest.address: hadoop102
rest.bind-address: 0.0.0.0
# Адрес узла TaskManager необходимо настроить как имя текущего компьютера.
taskmanager.bind-host: 0.0.0.0
taskmanager.host: hadoop102
vim /opt/module/flink-1.17.0/con/flink-conf.yaml/workers
Измените следующее содержимое:
hadoop102
hadoop103
hadoop104
vim /opt/module/flink-1.17.0/con/flink-conf.yaml/masters
Измените следующее содержимое:
hadoop102:8081
xsync flink-1.17.0/
Введите Hadoop103
vim /opt/module/flink-1.17.0/conf/flink-conf.yaml
Измените следующее содержимое:
# Адрес узла TaskManager необходимо настроить как имя текущего компьютера.
taskmanager.host: hadoop103
vim /opt/module/flink-1.17.0/conf/flink-conf.yaml
Измените следующее содержимое:
# Адрес узла TaskManager необходимо настроить как имя текущего компьютера.
taskmanager.host: hadoop104
cd /opt/module/flink-1.17.0
bin/start-cluster.sh
jpsall
После успешного запуска вы также можете посетить http://hadoop102:8081 для мониторинга и управления кластером и задачами Flink.
Здесь ясно видно, что количество диспетчеров задач в текущем кластере равно 3, поскольку количество слотов по умолчанию для каждого диспетчера задач равно 1, общее количество слотов и количество доступных слотов равны 3;
существоватьПоследний учебник Flink в 2024 году: от основ до трудоустройства, давайте учиться вместе — Начало работы — Блог CSDNВ этой статье мы написали, чтобы прочитатьsocketПрограммный кейс для отправки слов и подсчета количества слов。На этот раз мы будем использовать программу в качестве примера,Демонстрирует, как отправлять задачи в кластер для выполнения. Конкретные шаги заключаются в следующем.
Примечание. В исполняемой программе, поскольку это сокет, адрес подключения должен быть изменен на адрес, по которому в данный момент находится Hadoop102, иначе будет сообщено об ошибке.
DataStreamSource<String> socket_DS = env.socketTextStream("hadoop102", 9999);
Выполните следующую команду в Hadoop102, чтобы запустить netcat.
nc -lk 9999
существоватьПоследний учебник Flink в 2024 году: от основ до трудоустройства, давайте учиться вместе — Начало работы — Блог CSDNв этой статьеpom.xmlДобавьте конфигурацию упакованного плагина в файл,Подробности следующие
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
предоставленный объем
Следующее добавлено в зависимости Flink.
<scope>provided</scope>
Как правило, если вы выполняете развертывание в производственной среде, лучше всего добавить этот раздел и указать указанную область. Предоставленное означает, что зависимости, связанные с flink, не будут упаковываться во время упаковки, поскольку при развертывании этих зависимостей в кластере происходит развертывание кластера. Он уже существует, поэтому нет необходимости упаковывать его снова.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId> <!-- фликнуть основные зависимости -->
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId> <!-- флик-клиент -->
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
В среде Maven,когда тысуществоватьpom.xml
Зависимость в файле(dependency)добавить в<scope>provided</scope>
час,Это означает, что зависимость требуется на этапах компиляции и тестирования.,носуществоватьбегатьчасне будет Зависит отMavenизбить Сумкаплагин(нравитьсяmaven-jar-plugin
)Сумка СодержитсуществоватьфинальныйизjarВ сумке。Это因длябегатьчассреда(Сравниватьнравитьсясервер приложенийилиопределенные рамкинравитьсяApache Ожидается, что Flink) предоставит эти зависимости.
Для Apache Flink, когда вы используете Flink в качестве рабочей среды, многие собственные библиотеки и API Flink предоставляются средой выполнения, поэтому вам не нужно включать эти библиотеки в jar-пакет приложения. Сюда входит основная библиотека Flink, API потоковой обработки, API пакетной обработки и т. д.
существоватьFlinkприложениеизpom.xml
серединадобавить в<scope>provided</scope>
из Распространенные сценарии Сумкавключать:
flink-client_<scala-version>
(используется сFlinkкластеркоммуникацияизклиентская библиотека)Обычно устанавливается наprovided
,Потому что Flinkкластер уже включает в себя эти библиотеки.,Вашему приложению не нужно снова включать их при отправке в кластер.provided
。но,пожалуйста, обрати внимание,Из-за совместимости версий проблемы библиотеки Скала могут быть более сложными.,Эта практика не всегда необходима или рекомендуется.provided
。Однако,Для большинства приложений Flink,основнойизFlinkЗависимости(нравитьсяflink-streaming-java_<scala-version>
、flink-connector-kafka_<scala-version>
ждать)Обычно не устанавливаетсяprovided
,Потому что они не предоставляются напрямую Flinkкластером.,Скорее, вашему приложению требуются эти библиотеки при выполнении.
в общем,Воля Зависимостинастраиватьдляprovided
дадля了确保финальныйбить СумкаизприложениеjarНет Сумка Содержит那些существоватьбегатьчассредасередина已经存существоватьиз Библиотека,Это уменьшает размер jar-пакета и уменьшает потенциальные конфликты путей к классам. В контексте Флинка,В основном это касается Клиентской библиотека Flink и некоторые библиотеки, которые могут быть тесно связаны с версией Flinkкластера.
Проблемы, возникающие после предоставленного
Когда ручкаflinkиз Объемнастраиватьстановитьсяпосле предоставления,Оказывается, локальная программа сообщит об ошибке.
Программа ссылается на эту мою статью.
Последний учебник Flink в 2024 году: от основ до трудоустройства, давайте учиться вместе — Начало работы — Блог CSDN
Что касается программы Socket для потоковой обработки, то если вы снова запустите программу локально, может появиться ошибка, указывающая на то, что зависимости не могут быть найдены. Фактически, при компиляции предоставляются блоки, связанные с зависимостями Flink.
Нажмите «Изменить конфигурацию».
Но если программ много, вы не можете проверять их одну за другой, тогда нажмите «По умолчанию» в разделе «Приложение», настройте шаблон для последующих программ, проверьте параметры, и все последующие программы будут запускаться в соответствии с этой конфигурацией.
После настройки плагина вы можете использовать инструмент Maven IDEA для выполнения команды пакета. Однако рекомендуется нажимать кнопку «Очистить» перед каждым пакетом, чтобы очистить ранее упакованный пакет jar. В противном случае могут возникнуть проблемы, если ранее упакованный jar-файл является неактивным. не очищено перед упаковкой. Затем нажмите «Упаковать». Если появится следующее сообщение, упаковка прошла успешно.
Когда появится сообщение INFO BUILD SUCCESS, упаковка прошла успешно.
После завершения упаковки,Вы можете найти необходимый пакет JAR в целевом каталоге.,Будет два JAR-пакета.,flink_flink-1.0-SNAPSHOT.jar и original-flink_flink-1.0-SNAPSHOT.jar
Но если мы добавим предоставленную область видимости в файл pom, между ними не будет никакой разницы.
maven-shade-plugin
Плагин обработки)изjarСумка。maven-shade-plugin
илиshadowJar
ждатьплагин,Этот jar-пакет мог подвергнуться перемещению классов (Relocation), объединению ресурсов и т. д.,Чтобы гарантировать, что проблемы пути к классам в jar-пакетах сведены к минимуму.,Избегайте ошибок во время выполнения, вызванных конфликтами зависимостей.Веб-интерфейс в веб-интерфейсе. Этот jar-пакет был правильно собран.,более подходящийсуществоватьFlinkБегать в окружающей среде。убедитесь, что выиз Конфигурация сборки(нравитьсяMavenизpom.xml
илиGradleизbuild.gradle
)Уже настроен таргетингFlinkизразвертывать需求руководить了适当из Конфигурация,Включая, помимо прочего, перемещение классов, обработку ресурсов и т. д.
Пакет JAR будет загружен, как показано на рисунке ниже:
В основном настройте полное имя основного класса входа в программу, параллелизм выполнения задачи, параметры конфигурации, необходимые для запуска задачи, путь к точке сохранения и т. д., как показано на рисунке ниже. вы можете нажать кнопку «Отправить», чтобы отправить задачу для запуска в кластере.
Таким образом, вы можете напрямую скопировать полное имя пакета программы, которую нужно запустить.
Перед отправкой убедитесь, что netcat включен. В противном случае будет сообщено об ошибке.
Сначала нажмите «Диспетчер задач», затем щелкните узел сервера 192.168.10.104 справа.
Нажмите Stdout, чтобы просмотреть статистику слов приветствия.
Примечание. Если узел Hadoop104 не имеет статистических данных о словах, вы можете проверить их на других узлах TaskManager.
Введите содержимое в порт сокета
Нажмите на задачу, чтобы просмотреть сведения о выполняемой задаче, или нажмите «Отменить задание», чтобы завершить выполнение задачи.
Предполагается, что Flink-кластер запущен.
Помимо отправки задач через WEB-интерфейс, вы также можете отправлять задачи непосредственно через командную строку. Для удобства мы можем сначала загрузить jar-пакет непосредственно в каталог Hadoop102 flink-1.17.0 (это не обязательно, он может находиться в другом каталоге)
[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh
[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777
Снова откройте окно Hadoop102, введите путь установки Flink и используйте команду Flink Run в командной строке, чтобы отправить задание.
bin/flink run -m hadoop102:8081 -c wordcount.flink_wc_socket ./flink_flink-1.0-SNAPSHOT.jar
bin/flink
:ЭтоFlinkВ каталоге установкиизbin
子目录серединаизflink
Скрипт,Используется для выполнения различных команд Flink.run
:Этоflink
Скриптизподкоманда,Используется для отправки заданий во Flinkкластер для выполнения.-m hadoop102:8081
:этот Параметры указаныFlinkкластеризMasterузел(JobManager)изадресипорт。существоватьэтот例子середина,Masterузелродыhadoop102
на этой машине,и且监听существовать8081
в порту。ЭтоFlinkкластер Интерфейс управления(Web UI) и порт по умолчанию для отправки заданий.-c wordcount.flink_wc_socket
:этот Параметры указаны Операцияизосновной класс(Main Class)из Полное имя。существоватьэтот例子середина,wordcount.flink_wc_socket
да Сумка Содержитmain
методиздобрыйиз Полное имя,Этот класс является точкой входа в задание. Flink загрузит этот класс,и执行其серединаизmain
Способ начать работу。./flink_flink-1.0-SNAPSHOT.jar
:Это ОтправитьизFlinkОперацияизJARСумкапуть。существоватьэтот例子середина,JARСумка名дляflink_flink-1.0-SNAPSHOT.jar
,Находится в текущем каталоге(Зависит от./
выражать)。этотJARСумка Сумка Содержит了Операцияизвсе зависимостии После компиляциииздобрый文件,Это необходимый компонент для выполнения заданий Flink.Откройте веб-интерфейс в браузере и перейдите по адресу http://hadoop102:8081, чтобы просмотреть состояние выполнения приложения.
Используйте netcat для ввода данных, и вы сможете увидеть соответствующие статистические результаты в стандартном выводе (Stdout) TaskManager.
cat flink-atguigu-standalonesession-0-hadoop102.out