IP узла | Имя узла | роль узла | Запустить сервис |
---|---|---|---|
192.168.179.4 | node1 | Master1 | webserver,scheduler |
192.168.179.5 | node2 | Master2 | websever,scheduler |
192.168.179.6 | node3 | Worker1 | worker |
192.168.179.7 | node4 | Worker2 | worker |
Чтобы установить anconda и python3.7, обратитесь к разделу «Установка Airflow на одном узле».
Установите системные зависимости, необходимые для воздушного потока, на каждом узле.
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib
Конфигурация переменной среды airflow для каждого узла
vim /etc/profile
export AIRFLOW_HOME=/root/airflow
#Сделаем переменные среды Конфигурации эффективными
source /etc/profile
Переключите среду воздушного потока на каждом узле, установите воздушный поток и укажите версию 2.1.3.
(python37) conda activate python37
(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
По умолчанию Airflow устанавливается в каталог $ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow. Конфигурация AIRFLOW_HOME После установки Airflow каталог хранения файлов находится в каталоге AIRFLOW_HOME. Вы можете просмотреть информацию об установленной версии Airflow на каждом узле:
(python37) airflow version
2.1.3
Создайте соответствующую библиотеку в Mysql и задайте параметры.
Мы используем mysql для базы данных метаданных, используемой aiflow, и создаем информацию о библиотеке и таблице, используемую airflow, в mysql узла node2.
CREATE DATABASE airflow CHARACTER SET utf8;
create user 'airflow'@'%' identified by '123456';
grant all privileges on airflow.* to 'airflow'@'%';
flush privileges;
Измените «/etc/my.cnf» на узле установки mysql node2 и добавьте следующий контент в mysqld:
[mysqld]
explicit_defaults_for_timestamp=1
После изменения значения «my.cnf», указанного выше, перезапустите Mysql. После перезапуска вы можете проверить, действуют ли соответствующие параметры:
#Перезапускаем MySQL
[root@node2 ~]# service mysqld restart
#Повторно войдите в запрос MySQL
mysql> show variables like 'explicit_defaults_for_timestamp';
Каждый узел КонфигурацияAirflow Файл airflow.cfg
Изменить AIRFLOW_HOME/Файл airflow.cfg,Убедитесь, что все машины используют один и тот же файл конфигурации.,Конфигурацияairflow.cfg на узле node1,Конфигурацияследующее:
[core]
dags_folder = /root/airflow/dags
#Изменить часовой пояс
default_timezone = Asia/Shanghai
#Конфигурация Тип Executor, рекомендация по кластеру Конфигурация CeleryExecutor
executor = CeleryExecutor
# Конфигурациябаза данных
sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8
[webserver]
#Установить часовой пояс
default_ui_timezone = Asia/Shanghai
[celery]
#КонфигурацияCelery Очередь сообщений, используемая брокером
broker_url = redis://node4:6379/0
#КонфигурацияCelery После завершения задачи брокера статус обновляется с помощью библиотеки
result_backend = db+mysql://root:123456@node2:3306/airflow
Отправьте файл airflow.cfg узла node1 на узлы node2, node3 и node4:
(python37) [root@node1 airflow]# scp ./airflow.cfg node2:`pwd`
(python37) [root@node1 airflow]# scp ./airflow.cfg node3:`pwd`
(python37) [root@node1 airflow]# scp ./airflow.cfg node4:`pwd`
При использовании инициализации данных базы данных Airflow вам необходимо использовать пакет, который подключается к mysql. Выполните следующую команду, чтобы установить пакет Python, соответствующий mysql.
(python37) # pip install mysqlclient -i https://pypi.tuna.tsinghua.edu.cn/simple
(python37) [root@node1 airflow]# airflow db init
После инициализации соответствующая таблица будет создана в библиотеке воздушного потока MySQL.
Выполните следующую команду на узле node1, чтобы создать информацию о пользователе для работы Airflow:
airflow users create \
--username airflow \
--firstname airflow \
--lastname airflow \
--role Admin \
--email xx@qq.com
После завершения выполнения установите пароль «123456» и подтвердите его, чтобы завершить создание информации администратора Airflow.
Авторизоватьсяhttps://github.com/teamclairvoyant/airflow-scheduler-failover-controllerскачать airflow-scheduler-failover-controller Сторонний компонент, загрузите скачанный zip-пакет на узел 1. В каталоге «/software».
Установите unzip на узле node1 и распакуйте компонент аварийного переключения:
(python37) [root@node1 software]# yum -y install unzip
(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip
Пакеты зависимостей, необходимые для аварийного переключения, необходимо установить на узле node1.
(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master
(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .
(python37) [root@node1 ~]# scheduler_failover_controller init
Adding Scheduler Failover configs to Airflow config file...
Finished adding Scheduler Failover configs to Airflow config file.
Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.
Примечание. При инициализации воздушного потока Конфигурация будет добавлена в файл airflow.cfg Конфигурация, поэтому сначала необходимо установить воздушный поток и его сущность.
Сначала измените AIRFLOW_HOME/airflow.cfg узла node1.
[scheduler_failover]
# Конфигурацияairflow Главный узел, где Конфигурация — node1, node2, оба узла не должны иметь паролей.
scheduler_nodes_in_cluster = node1,node2
#В строке 1088 обратите особое внимание на то, что точку с запятой необходимо убрать, иначе Планировщик не сможет нормально запуститься при его автоматическом перезапуске позже.
airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &
После завершения настройки вы можете проверить Airflow с помощью следующей команды: Главный узел:
(python37) [root@node1 airflow]# scheduler_failover_controller test_connection
Testing Connection for host 'node1'
(True, ['Connection Succeeded', ''])
Testing Connection for host 'node2'
(True, ['Connection Succeeded\n'])
Синхронно отправьте файл airflow.cfg конфигурации узла node1 на узлы node2, node3 и node4:
(python37) [root@node1 ~]# cd /root/airflow/
(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`
(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`
(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`
(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3
#Фоновый запуск по умолчанию может использовать -D , используя здесь -D, иногда невозможно нормально запустить соответствующий процесс Airflow.
airflow webserver
airflow scheduler
airflow webserver
Запустите Worker на узлах node3 и node4:
(python37) [root@node3 ~]# airflow celery worker
(python37) [root@node4 ~]# airflow celery worker
(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &
На этом этапе создание кластера высокой доступности Airflow завершено.
Введите node1:8080 в браузере, чтобы просмотреть веб-интерфейс Airflow:
существоватьВсе узлы в кластере Airflow{AIRFLOW_HOME}Создать в каталогеdagsОглавление,Подготовьте следующие два сценария оболочки и поместите следующие два сценария в каталог $AIRFLOW_HOME/dags.,BashOperator по умолчанию при выполнении сценариев,По умолчанию соответствующий скрипт находится во временном каталоге /tmp/airflow**.,Поскольку имя временного каталога неизвестно.,Здесь рекомендуется при выполнении скрипта,Напишите абсолютный путь в «bash_command». Если вы хотите написать относительный путь,Вы можете поместить скрипт в каталог /tmp.,Для выполнения команды в «bash_command» напишите «sh. Также можно использовать ../xxx.sh».
first_shell.sh
#!/bin/bash
dt=$1
echo "==== execute first shell ===="
echo "---- first : time is ${dt}"
second_shell.sh
#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
default_args = {
'owner':'zhangsan',
'start_date':datetime(2021, 9, 23),
'retries': 1, # Количество неудачных попыток
'retry_delay': timedelta(minutes=5) # Интервал повторения ошибки
}
dag = DAG(
dag_id = 'execute_shell_sh',
default_args=default_args,
schedule_interval=timedelta(minutes=1)
)
first=BashOperator(
task_id='first',
#Рекомендуется прописывать абсолютный путь к скрипту
bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag = dag
)
second=BashOperator(
task_id='second',
#Рекомендуется прописывать абсолютный путь к скрипту
bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
dag=dag
)
first >> second
Запишите приведенное выше содержимое в файлexecute_shell.py.,начальство传到Все узлы Airflow{AIRFLOW_HOME}/dagsОглавление下。
Прежде чем перезапустить Airflow, сначала завершите процессы веб-сервера и планировщика на узле node1, завершите процессы веб-сервера и планировщика на узле node2, а также завершите рабочие процессы на узлах node3 и node4.
Если каждый процесс запускается в фоновом режиме, проверьте режим фонового процесса:
(python37) [root@node1 dags]# ps aux |grep webserver
(python37) [root@node1 dags]# ps aux |grep scheduler
(python37) [root@node2 dags]# ps aux |grep webserver
(python37) [root@node2 dags]# ps aux |grep scheduler
(python37) [root@node3 ~]# ps aux|grep "celery worker"
(python37) [root@node4 ~]# ps aux|grep "celery worker"
Найдите номер процесса, соответствующий соответствующей команде запуска, и завершите его.
После перезапуска войдите в Airflow WebUI для просмотра задач:
После нажатия задачи «успех» вы увидите журнал успешного выполнения скрипта:
Когда мы закрываем веб-сервер узла node1, мы можем напрямую получить доступ к веб-интерфейсу воздушного потока через узел node2:
На узле node1 найдите процесс «scheduler» и завершите его, чтобы проверить, вступит ли в силу планировщик HA:
(python37) [root@node1 ~]# ps aux|grep scheduler
root 23744 0.9 3.3 326940 63028 pts/2 S 00:08 0:02 airflow scheduler -- DagFileProcessorManager
#kill Отменить процесс планировщика
(python37) [root@node1 ~]# kill -9 23744
#Доступ к веб-серверу webui
# Проверьте журнал процесса Scheduler_failover_controller на узле node1 на предмет действия по запуску schudler. Примечание. Здесь сначала запустите с узла 1. Если он не может запуститься, запустите Schduler с других главных узлов.