Платформа планирования больших данных Airflow (8): построение и тестирование распределенного кластера Airflow
Платформа планирования больших данных Airflow (8): построение и тестирование распределенного кластера Airflow

Создание и тестирование распределенного кластера Airflow

один、Планирование узла

IP узла

Имя узла

роль узла

Запустить сервис

192.168.179.4

node1

Master1

webserver,scheduler

192.168.179.5

node2

Master2

websever,scheduler

192.168.179.6

node3

Worker1

worker

192.168.179.7

node4

Worker2

worker

два、Этапы построения кластера воздушного потока

1. Установите python3.7 на все узлы.

Чтобы установить anconda и python3.7, обратитесь к разделу «Установка Airflow на одном узле».

2、Установите воздушный поток на всех узлах

Установите системные зависимости, необходимые для воздушного потока, на каждом узле.

Язык кода:javascript
копировать
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib 

Конфигурация переменной среды airflow для каждого узла

Язык кода:javascript
копировать
vim /etc/profile
export AIRFLOW_HOME=/root/airflow

#Сделаем переменные среды Конфигурации эффективными
source /etc/profile

Переключите среду воздушного потока на каждом узле, установите воздушный поток и укажите версию 2.1.3.

Язык кода:javascript
копировать
(python37)   conda activate python37
(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

По умолчанию Airflow устанавливается в каталог $ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow. Конфигурация AIRFLOW_HOME После установки Airflow каталог хранения файлов находится в каталоге AIRFLOW_HOME. Вы можете просмотреть информацию об установленной версии Airflow на каждом узле:

Язык кода:javascript
копировать
(python37)  airflow version
2.1.3

Создайте соответствующую библиотеку в Mysql и задайте параметры.

Мы используем mysql для базы данных метаданных, используемой aiflow, и создаем информацию о библиотеке и таблице, используемую airflow, в mysql узла node2.

Язык кода:javascript
копировать
CREATE DATABASE airflow CHARACTER SET utf8;
create user 'airflow'@'%' identified by '123456';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;

Измените «/etc/my.cnf» на узле установки mysql node2 и добавьте следующий контент в mysqld:

Язык кода:javascript
копировать
[mysqld]
explicit_defaults_for_timestamp=1

После изменения значения «my.cnf», указанного выше, перезапустите Mysql. После перезапуска вы можете проверить, действуют ли соответствующие параметры:

Язык кода:javascript
копировать
#Перезапускаем MySQL
[root@node2 ~]# service mysqld restart

#Повторно войдите в запрос MySQL
mysql> show variables like 'explicit_defaults_for_timestamp';

Каждый узел КонфигурацияAirflow Файл airflow.cfg

Изменить AIRFLOW_HOME/Файл airflow.cfg,Убедитесь, что все машины используют один и тот же файл конфигурации.,Конфигурацияairflow.cfg на узле node1,Конфигурацияследующее:

Язык кода:javascript
копировать
[core]
dags_folder = /root/airflow/dags

#Изменить часовой пояс
default_timezone = Asia/Shanghai

#Конфигурация Тип Executor, рекомендация по кластеру Конфигурация CeleryExecutor
executor = CeleryExecutor

# Конфигурациябаза данных
sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8

[webserver]
#Установить часовой пояс
default_ui_timezone = Asia/Shanghai

[celery]
#КонфигурацияCelery Очередь сообщений, используемая брокером
broker_url = redis://node4:6379/0
#КонфигурацияCelery После завершения задачи брокера статус обновляется с помощью библиотеки
result_backend = db+mysql://root:123456@node2:3306/airflow

Отправьте файл airflow.cfg узла node1 на узлы node2, node3 и node4:

Язык кода:javascript
копировать
(python37) [root@node1 airflow]# scp ./airflow.cfg  node2:`pwd`
(python37) [root@node1 airflow]# scp ./airflow.cfg  node3:`pwd`
(python37) [root@node1 airflow]# scp ./airflow.cfg  node4:`pwd`

три、ИнициализироватьAirflow

1、Установите необходимые пакеты зависимостей Python на каждом узле.

При использовании инициализации данных базы данных Airflow вам необходимо использовать пакет, который подключается к mysql. Выполните следующую команду, чтобы установить пакет Python, соответствующий mysql.

Язык кода:javascript
копировать
(python37) #  pip install mysqlclient -i https://pypi.tuna.tsinghua.edu.cn/simple

2、существоватьnode1начальство ИнициализироватьAirflow база данных

Язык кода:javascript
копировать
(python37) [root@node1 airflow]# airflow db init

После инициализации соответствующая таблица будет создана в библиотеке воздушного потока MySQL.

Четыре、Создайте информацию о пользователе-администраторе

Выполните следующую команду на узле node1, чтобы создать информацию о пользователе для работы Airflow:

Язык кода:javascript
копировать
airflow users create \
    --username airflow \
    --firstname airflow \
    --lastname airflow \
    --role Admin \
    --email xx@qq.com

После завершения выполнения установите пароль «123456» и подтвердите его, чтобы завершить создание информации администратора Airflow.

пять、КонфигурацияScheduler HA

1、Скачать компонент аварийного переключения

Авторизоватьсяhttps://github.com/teamclairvoyant/airflow-scheduler-failover-controllerскачать airflow-scheduler-failover-controller Сторонний компонент, загрузите скачанный zip-пакет на узел 1. В каталоге «/software».

Установите unzip на узле node1 и распакуйте компонент аварийного переключения:

Язык кода:javascript
копировать
(python37) [root@node1 software]# yum -y install unzip
(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip

2、Используйте pip для установки пакетов зависимостей, необходимых для аварийного переключения.

Пакеты зависимостей, необходимые для аварийного переключения, необходимо установить на узле node1.

Язык кода:javascript
копировать
(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master
(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .

3、аварийное переключение инициализации узла node1

Язык кода:javascript
копировать
(python37) [root@node1 ~]# scheduler_failover_controller init
Adding Scheduler Failover configs to Airflow config file...
Finished adding Scheduler Failover configs to Airflow config file.
Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.

Примечание. При инициализации воздушного потока Конфигурация будет добавлена ​​в файл airflow.cfg Конфигурация, поэтому сначала необходимо установить воздушный поток и его сущность.

4、Измените airflow.cfg

Сначала измените AIRFLOW_HOME/airflow.cfg узла node1.

Язык кода:javascript
копировать
[scheduler_failover]
# Конфигурацияairflow Главный узел, где Конфигурация — node1, node2, оба узла не должны иметь паролей.
scheduler_nodes_in_cluster = node1,node2

#В строке 1088 обратите особое внимание на то, что точку с запятой необходимо убрать, иначе Планировщик не сможет нормально запуститься при его автоматическом перезапуске позже.
airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &

После завершения настройки вы можете проверить Airflow с помощью следующей команды: Главный узел:

Язык кода:javascript
копировать
(python37) [root@node1 airflow]# scheduler_failover_controller test_connection
Testing Connection for host 'node1'
(True, ['Connection Succeeded', ''])
Testing Connection for host 'node2'
(True, ['Connection Succeeded\n'])

Синхронно отправьте файл airflow.cfg конфигурации узла node1 на узлы node2, node3 и node4:

Язык кода:javascript
копировать
(python37) [root@node1 ~]# cd /root/airflow/
(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`
(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`
(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`

шесть、Запустить кластер Airflow

1、Установите пакет Python, который запускает Airflow на всех узлах.

Язык кода:javascript
копировать
(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

2、Запустите соответствующий процесс на узле Master1 (node1).

Язык кода:javascript
копировать
#Фоновый запуск по умолчанию может использовать -D , используя здесь -D, иногда невозможно нормально запустить соответствующий процесс Airflow.
airflow webserver
airflow scheduler

3、Запустите соответствующий процесс на узле Мастер2 (node2).

Язык кода:javascript
копировать
airflow webserver

4、Запустите Worker на узлах Worker1(node3), Worker2(node4).

Запустите Worker на узлах node3 и node4:

Язык кода:javascript
копировать
(python37) [root@node3 ~]# airflow celery worker
(python37) [root@node4 ~]# airflow celery worker

5、Запустите планировщик высокой доступности на узле 1.

Язык кода:javascript
копировать
(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &

На этом этапе создание кластера высокой доступности Airflow завершено.

Семь、Посетите Airflow КластерWebUI

Введите node1:8080 в браузере, чтобы просмотреть веб-интерфейс Airflow:

восемь、Тест воздушного потока HA

1. Подготовьте сценарий оболочки

существоватьВсе узлы в кластере Airflow{AIRFLOW_HOME}Создать в каталогеdagsОглавление,Подготовьте следующие два сценария оболочки и поместите следующие два сценария в каталог $AIRFLOW_HOME/dags.,BashOperator по умолчанию при выполнении сценариев,По умолчанию соответствующий скрипт находится во временном каталоге /tmp/airflow**.,Поскольку имя временного каталога неизвестно.,Здесь рекомендуется при выполнении скрипта,Напишите абсолютный путь в «bash_command». Если вы хотите написать относительный путь,Вы можете поместить скрипт в каталог /tmp.,Для выполнения команды в «bash_command» напишите «sh. Также можно использовать ../xxx.sh».

first_shell.sh

Язык кода:javascript
копировать
#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"

second_shell.sh

Язык кода:javascript
копировать
#!/bin/bash

dt=$1

echo "==== execute second shell ===="

echo "---- second : time is ${dt}"

2、Написать воздушный поток python Конфигурация

Язык кода:javascript
копировать
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # Количество неудачных попыток
    'retry_delay': timedelta(minutes=5) # Интервал повторения ошибки
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #Рекомендуется прописывать абсолютный путь к скрипту
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #Рекомендуется прописывать абсолютный путь к скрипту
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second

Запишите приведенное выше содержимое в файлexecute_shell.py.,начальство传到Все узлы Airflow{AIRFLOW_HOME}/dagsОглавление下。

3、Перезапустите Airflow и войдите в Airflow. WebUI для просмотра соответствующего расписания

Прежде чем перезапустить Airflow, сначала завершите процессы веб-сервера и планировщика на узле node1, завершите процессы веб-сервера и планировщика на узле node2, а также завершите рабочие процессы на узлах node3 и node4.

Если каждый процесс запускается в фоновом режиме, проверьте режим фонового процесса:

Язык кода:javascript
копировать
(python37) [root@node1 dags]# ps aux |grep webserver
(python37) [root@node1 dags]# ps aux |grep scheduler
(python37) [root@node2 dags]# ps aux |grep webserver
(python37) [root@node2 dags]# ps aux |grep scheduler
(python37) [root@node3 ~]# ps aux|grep "celery worker"
(python37) [root@node4 ~]# ps aux|grep "celery worker"
Найдите номер процесса, соответствующий соответствующей команде запуска, и завершите его.

После перезапуска войдите в Airflow WebUI для просмотра задач:

После нажатия задачи «успех» вы увидите журнал успешного выполнения скрипта:

4、Тест воздушного потока HA

Когда мы закрываем веб-сервер узла node1, мы можем напрямую получить доступ к веб-интерфейсу воздушного потока через узел node2:

На узле node1 найдите процесс «scheduler» и завершите его, чтобы проверить, вступит ли в силу планировщик HA:

Язык кода:javascript
копировать
(python37) [root@node1 ~]# ps aux|grep scheduler

root      23744  0.9  3.3 326940 63028 pts/2    S    00:08   0:02 airflow scheduler -- DagFileProcessorManager

#kill Отменить процесс планировщика

(python37) [root@node1 ~]# kill -9 23744



#Доступ к веб-серверу webui

# Проверьте журнал процесса Scheduler_failover_controller на узле node1 на предмет действия по запуску schudler. Примечание. Здесь сначала запустите с узла 1. Если он не может запуститься, запустите Schduler с других главных узлов.

boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose