Flink использует код для отправки задач
Flink использует код для отправки задач

Предисловие

В этой статье используется Flink версии 1.12.7.

Задача отправки кода

Подготовьте папки и файлы

код Язык:javascript
копировать
hadoop fs -mkdir -p /jar/userTask
hadoop fs -mkdir -p /jar/flink12/libdist
hadoop fs -mkdir -p /jar/flink12/lib

Скопируйте необходимые файлы

код Язык:javascript
копировать
hadoop fs -put $FLINK_HOME/examples/batch/WordCount.jar /jar/userTask/WordCount.jar
hadoop fs -put $FLINK_HOME/lib/flink-dist_2.12-1.12.7.jar /jar/flink12/libdist/flink-dist_2.12-1.12.7.jar
hadoop fs -put $FLINK_HOME/lib/* /jar/flink12/lib/

Для просмотра файла вы можете получить доступ к этому адресу

http://hadoop01:50070/explorer.html#/

http://hadoop02:50070/explorer.html#/

Протестируйте это на сервере

код Язык:javascript
копировать
flink run-application -t yarn-application hdfs://hacluster/jar/userTask/WordCount.jar --output hdfs://hacluster/bigdata_study/output03

Добавить зависимости

код Язык:javascript
копировать
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-yarn_${scala.binary.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

код

код Язык:javascript
копировать
package cn.psvmc;

import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;

import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import java.util.Collections;

import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;

public class RunFlinkJob {
    public static void main(String[] args) {
        // локальный каталог конфигурации flink, чтобы получить конфигурацию flink
        // Если возникает исключение org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user функция.ошибка
        // Затем добавьте его в flink-config.yaml.
        // classloader.resolve-order: parent-first
        String configurationDirectory = "/data/tools/bigdata/flink-1.12.7/conf";

        //Сохраняем каталог пакета jar, связанный с кластером Flink
        String flinkLibs = "hdfs://hacluster/jar/flink12/lib";
        //пользовательская банка
        String userJarPath = "hdfs://hacluster/jar/userTask/WordCount.jar";
        String flinkDistJar = "hdfs://hacluster/jar/flink12/libdist/flink-dist_2.12-1.12.7.jar";

        YarnClient yarnClient = YarnClient.createYarnClient();
        org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration();
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/yarn-site.xml"));
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/hdfs-site.xml"));
        entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/core-site.xml"));
        YarnConfiguration yarnConfiguration = new YarnConfiguration(entries);
        yarnClient.init(yarnConfiguration);
        yarnClient.start();

        // Установите журнал. В противном случае вы не сможете просмотреть журнал.
        YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
                .create(yarnClient);

        //Получаем конфигурацию флинка
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory
        );

        flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

        flinkConfiguration.set(
                PipelineOptions.JARS,
                Collections.singletonList(userJarPath)
        );

        Path remoteLib = new Path(flinkLibs);
        flinkConfiguration.set(
                YarnConfigOptions.PROVIDED_LIB_DIRS,
                Collections.singletonList(remoteLib.toString())
        );

//        flinkConfiguration.set(
//                YarnConfigOptions.FLINK_DIST_JAR,
//                flinkDistJar
//        );

        // Установите режим ПРИЛОЖЕНИЯ.
        flinkConfiguration.set(
                DeploymentOptions.TARGET,
                YarnDeploymentTarget.APPLICATION.getName()
        );

        // yarn application name
        flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "zApplication");

//        flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));
//        flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));


        YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);

        ClusterSpecification clusterSpecification = new ClusterSpecification
                .ClusterSpecificationBuilder()
                .createClusterSpecification();

        // Установите параметры и основной класс пользовательского jar-файла.
//        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, "org.apache.flink.examples.java.wordcount.WordCount");
        ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
        YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
                flinkConfiguration,
                yarnConfiguration,
                yarnClient,
                clusterInformationRetriever,
                true
        );

        try {
            ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
                    clusterSpecification,
                    appConfig
            );

            ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();

            ApplicationId applicationId = clusterClient.getClusterId();
            String webInterfaceURL = clusterClient.getWebInterfaceURL();
            System.out.println("applicationId is {}" + applicationId);
            System.out.println("webInterfaceURL is {}" + webInterfaceURL);

            // покидать
            // yarnClusterDescriptor.killCluster(applicationId);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Посмотреть пряжу

http://hadoop02:8088/cluster

Вызов выполнения скрипта

код Язык:javascript
копировать
package cn.psvmc;

import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.sun.istack.logging.Logger;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

public class ConnectionSSH {
  private static final Logger logger = Logger.getLogger(ConnectionSSH.class);
  public static void main(String[] args) throws JSchException, IOException {
    JSch jsch = new JSch();
    String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";
    jsch.addIdentity(pubKeyPath);
    String username = "root";
    String host = "192.168.7.101";
    Session session =jsch.getSession(username, host, 22);//Подготовка к подключению
    session.setConfig("StrictHostKeyChecking", "no");
    session.connect();
    String command = "flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
    ChannelExec channel=(ChannelExec)session.openChannel("exec");
    channel.setCommand(command);
    BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream()));
    channel.connect();

    String msg;
    while((msg = in.readLine()) != null){
      System.out.println(msg);
    }
    channel.disconnect();
    session.disconnect();
  }
}

Использовать пароль

код Язык:javascript
копировать
JSch jsch = new JSch();
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//Подготовка к подключению
session.setConfig("StrictHostKeyChecking", "no");
session.setPassword("zhangjian");
session.connect();

Использовать ключ

код Язык:javascript
копировать
JSch jsch = new JSch();
String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//Подготовка к подключению
session.setConfig("StrictHostKeyChecking", "no");
session.connect();

Вызов выполнения скрипта2

Помимо запуска скриптов, этот класс также может копировать файлы.

полагаться:

код Язык:javascript
копировать
<dependency>
    <groupId>ch.ethz.ganymed</groupId>
    <artifactId>ganymed-ssh2</artifactId>
    <version>build210</version>
</dependency>

Инструменты

код Язык:javascript
копировать
package cn.psvmc;

import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

import java.io.*;
/**
 * Описание: Подключитесь к серверу Linux и выполните соответствующие команды оболочки.
 */
public class ConnectLinuxCommand {
    private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class);

    private static final String DEFAULTCHARTSET = "UTF-8";
    private static Connection conn;

    /**
     * @Title: login
     * @Description: Метод имени пользователя и пароля  Удаленный вход на Linux-сервер
     * @return: Boolean
     */
    public static Boolean login(RemoteConnect remoteConnect) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// соединять
            flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// Сертификация
            if (flag) {
                logger.info("Сертификацияуспех!");
            } else {
                logger.error("Сертификациянеудача!");
                conn.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }

    public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) {
        boolean flag = true;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();// соединять
            boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess();
            System.out.println("authenticationPartialSuccess = " + authenticationPartialSuccess);
            logger.info("Сертификацияуспех!");
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param remoteConnect объединять информационный объект
     * @param keyFile       Объект файла указывает на файл, содержащий закрытый ключ пользователя DSA или RSA в формате OpenSSH** (PEM, не может быть потерян" ----- BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE КЛЮЧ -----" тег
     * @param keyfilePass   Если файл ключа зашифрован Этот параметр необходимо использовать для расшифровки. Он может иметь значение null, если шифрование отсутствует.
     * @return Boolean
     * @Title: loginByKey
     * @Description: режим секретного ключа  Удаленный вход на Linux-сервер
     */
    public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // Авторизоваться Сертификация
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass);
            if (flag) {
                logger.info("Сертификацияуспех!");
            } else {
                logger.error("Сертификациянеудача!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param remoteConnect объединять информационный объект
     * @param keys          Символ [], содержащий закрытый ключ пользователя DSA или RSA (формат ключа OpenSSH, вы не можете потерять "----- begin Закрытый ключ DSA -----" или "----- BEGIN RSA PRIVATE KEY ----- "Тег. Массив символов может содержать символы новой строки/новой строки.
     * @param keyPass       Если массив символов секретного ключа зашифрован  Это поле необходимо для расшифровки  В противном случае не требуется, может быть нулевым
     * @return Boolean
     * @Title: loginByCharsKey
     * @Description: режим секретного ключа  Удаленный вход на Linux-сервер
     */
    public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) {
        boolean flag = false;
        try {
            conn = new Connection(remoteConnect.getIp());
            conn.connect();
            // Авторизоваться Сертификация
            flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass);
            if (flag) {
                logger.info("Сертификацияуспех!");
            } else {
                logger.error("Сертификациянеудача!");
                conn.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return flag;
    }

    /**
     * @param cmd Команда сценария
     * @Title: execute
     * @Description: Удаленно выполнять сценарии или команды shll
     * @return: result Команда выполняется и возвращается результат.
     */
    public static String runCmd(String cmd) {
        String result = "";
        try {
            Session session = conn.openSession();// открыть сеанс
            session.execCommand(cmd);// выполнить команду
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            // Если стандартный вывод пуст, при выполнении скрипта произошла ошибка.
            if (StringUtils.isBlank(result)) {
                result = processStdout(session.getStderr(), DEFAULTCHARTSET);
            }
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * @return String Значение результата, возвращаемое после успешного выполнения команды. Если выполнение команды завершается неудачно, возвращается пустая строка, а не ноль.
     * @Title: executeSuccess
     * @Description: Удаленно выполнять сценарии оболочки или команды
     */
    public static String runCmdSuccess(String cmd) {
        String result = "";
        try {
            Session session = conn.openSession();// открыть сеанс
            session.execCommand(cmd);// выполнить команду
            result = processStdout(session.getStdout(), DEFAULTCHARTSET);
            conn.close();
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }

    /**
     * @param in      объект входного потока
     * @param charset кодирование
     * @return String Вернуть в виде обычного текста
     * @Title: processStdout
     * @Description: Анализ возвращаемых результатов выполнения скрипта
     */
    public static String processStdout(InputStream in, String charset) {
        InputStream stdout = new StreamGobbler(in);
        StringBuilder buffer = new StringBuilder();
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
            String line = null;
            while ((line = br.readLine()) != null) {
                buffer.append(line).append("\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return buffer.toString();
    }

    /**
     * @return String
     * @Description: Свяжите Linux-сервер через имя пользователя и пароль
     */
    public static String runCmd(String ip, String userName, String password, String commandStr) {
        logger.info(
                "ConnectLinuxCommand  scpGet===" +
                        "ip:" + ip +
                        "  userName:" + userName +
                        "  commandStr:" + commandStr
        );

        String returnStr = "";
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        try {
            if (login(remoteConnect)) {
                returnStr = runCmd(commandStr);
                System.out.println(returnStr);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return returnStr;
    }

    public static boolean connectLinuxWithoutPwd(String ip, String userName, String commandStr) {
        logger.info("ConnectLinuxCommand  scpGet===" + "ip:" + ip + "  userName:" + userName + "  commandStr:"
                + commandStr);

        String returnStr = "";
        boolean result = true;
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        try {
            if (loginWithoutPwd(remoteConnect)) {
                returnStr = runCmd(commandStr);
                System.out.println(result);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        if (StringUtils.isBlank(returnStr)) {
            result = false;
        }
        return result;
    }

    /**
     * @param password   Пароль (другие серверы)
     * @param remoteFile Местоположение файла (другие серверы)
     * @param localDir   Этот каталог сервера
     * @Title: scpGet
     * @Description: Получить файлы с других серверов в указанный каталог на этом сервере
     */
    public static void scpPull(String ip, String userName, String password, String remoteFile, String localDir)
            throws IOException {

        logger.info("ConnectLinuxCommand  scpGet===" + "ip:" + ip + "  userName:" + userName + "  remoteFile:"
                + remoteFile + "  localDir:" + localDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.get(remoteFile, localDir);
            conn.close();
        }
    }


    /**
     * Переместить файлы, копировать на другие компьютеры
     * @param ip Удаленный IP
     * @param userName имя удаленного пользователя
     * @param password удаленный пароль
     * @param localFile локальный файл
     * @param remoteDir удаленный каталог
     * @throws IOException аномальный
     */
    public static void scpPush(String ip, String userName, String password, String localFile, String remoteDir)
            throws IOException {
        logger.info("ConnectLinuxCommand  scpPut===" + "ip:" + ip + "  userName:" + userName + "  localFile:"
                + localFile + "  remoteDir:" + remoteDir);
        RemoteConnect remoteConnect = new RemoteConnect();
        remoteConnect.setIp(ip);
        remoteConnect.setUserName(userName);
        remoteConnect.setPassword(password);
        if (login(remoteConnect)) {
            SCPClient client = new SCPClient(conn);
            client.put(localFile, remoteDir);
            conn.close();
        }
    }
}

RemoteConnect

код Язык:javascript
копировать
public class RemoteConnect {
    String ip;
    String userName;
    String password;

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

тест

код Язык:javascript
копировать
package cn.psvmc;

public class CLCTest {
    public static void main(String[] args) {
        mTest1();
    }

    public static  void mTest1() {
        System.out.println("--------------------------------------");
        String commandStr="flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
        String result=ConnectLinuxCommand.runCmd("192.168.7.101","root","zhangjian",commandStr);
        System.out.println("Результат:"+результат);
        System.out.println("--------------------------------------");
    }

    public static void mTest2() {
        try {
            ConnectLinuxCommand.scpPull("192.168.7.101","root","zhangjian", "/root/test.txt", "d:/aa");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void mTest3() {
        try {
            ConnectLinuxCommand.scpPush("192.168.7.101","root","zhangjian", "d:/aa/test2.txt", "/root/");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
boy illustration
Неразрушающее увеличение изображений одним щелчком мыши, чтобы сделать их более четкими артефактами искусственного интеллекта, включая руководства по установке и использованию.
boy illustration
Копикодер: этот инструмент отлично работает с Cursor, Bolt и V0! Предоставьте более качественные подсказки для разработки интерфейса (создание навигационного веб-сайта с использованием искусственного интеллекта).
boy illustration
Новый бесплатный RooCline превосходит Cline v3.1? ! Быстрее, умнее и лучше вилка Cline! (Независимое программирование AI, порог 0)
boy illustration
Разработав более 10 проектов с помощью Cursor, я собрал 10 примеров и 60 подсказок.
boy illustration
Я потратил 72 часа на изучение курсорных агентов, и вот неоспоримые факты, которыми я должен поделиться!
boy illustration
Идеальная интеграция Cursor и DeepSeek API
boy illustration
DeepSeek V3 снижает затраты на обучение больших моделей
boy illustration
Артефакт, увеличивающий количество очков: на основе улучшения характеристик препятствия малым целям Yolov8 (SEAM, MultiSEAM).
boy illustration
DeepSeek V3 раскручивался уже три дня. Сегодня я попробовал самопровозглашенную модель «ChatGPT».
boy illustration
Open Devin — инженер-программист искусственного интеллекта с открытым исходным кодом, который меньше программирует и больше создает.
boy illustration
Эксклюзивное оригинальное улучшение YOLOv8: собственная разработка SPPF | SPPF сочетается с воспринимаемой большой сверткой ядра UniRepLK, а свертка с большим ядром + без расширения улучшает восприимчивое поле
boy illustration
Популярное и подробное объяснение DeepSeek-V3: от его появления до преимуществ и сравнения с GPT-4o.
boy illustration
9 основных словесных инструкций по доработке академических работ с помощью ChatGPT, эффективных и практичных, которые стоит собрать
boy illustration
Вызовите deepseek в vscode для реализации программирования с помощью искусственного интеллекта.
boy illustration
Познакомьтесь с принципами сверточных нейронных сетей (CNN) в одной статье (суперподробно)
boy illustration
50,3 тыс. звезд! Immich: автономное решение для резервного копирования фотографий и видео, которое экономит деньги и избавляет от беспокойства.
boy illustration
Cloud Native|Практика: установка Dashbaord для K8s, графика неплохая
boy illustration
Краткий обзор статьи — использование синтетических данных при обучении больших моделей и оптимизации производительности
boy illustration
MiniPerplx: новая поисковая система искусственного интеллекта с открытым исходным кодом, спонсируемая xAI и Vercel.
boy illustration
Конструкция сервиса Synology Drive сочетает проникновение в интрасеть и синхронизацию папок заметок Obsidian в облаке.
boy illustration
Центр конфигурации————Накос
boy illustration
Начинаем с нуля при разработке в облаке Copilot: начать разработку с минимальным использованием кода стало проще
boy illustration
[Серия Docker] Docker создает мультиплатформенные образы: практика архитектуры Arm64
boy illustration
Обновление новых возможностей coze | Я использовал coze для создания апплета помощника по исправлению домашних заданий по математике
boy illustration
Советы по развертыванию Nginx: практическое создание статических веб-сайтов на облачных серверах
boy illustration
Feiniu fnos использует Docker для развертывания личного блокнота Notepad
boy illustration
Сверточная нейронная сеть VGG реализует классификацию изображений Cifar10 — практический опыт Pytorch
boy illustration
Начало работы с EdgeonePages — новым недорогим решением для хостинга веб-сайтов
boy illustration
[Зона легкого облачного игрового сервера] Управление игровыми архивами
boy illustration
Развертывание SpringCloud-проекта на базе Docker и Docker-Compose