В этой статье используется Flink версии 1.12.7.
Подготовьте папки и файлы
hadoop fs -mkdir -p /jar/userTask
hadoop fs -mkdir -p /jar/flink12/libdist
hadoop fs -mkdir -p /jar/flink12/lib
Скопируйте необходимые файлы
hadoop fs -put $FLINK_HOME/examples/batch/WordCount.jar /jar/userTask/WordCount.jar
hadoop fs -put $FLINK_HOME/lib/flink-dist_2.12-1.12.7.jar /jar/flink12/libdist/flink-dist_2.12-1.12.7.jar
hadoop fs -put $FLINK_HOME/lib/* /jar/flink12/lib/
Для просмотра файла вы можете получить доступ к этому адресу
http://hadoop01:50070/explorer.html#/
http://hadoop02:50070/explorer.html#/
Протестируйте это на сервере
flink run-application -t yarn-application hdfs://hacluster/jar/userTask/WordCount.jar --output hdfs://hacluster/bigdata_study/output03
Добавить зависимости
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
код
package cn.psvmc;
import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import java.util.Collections;
import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
public class RunFlinkJob {
public static void main(String[] args) {
// локальный каталог конфигурации flink, чтобы получить конфигурацию flink
// Если возникает исключение org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user функция.ошибка
// Затем добавьте его в flink-config.yaml.
// classloader.resolve-order: parent-first
String configurationDirectory = "/data/tools/bigdata/flink-1.12.7/conf";
//Сохраняем каталог пакета jar, связанный с кластером Flink
String flinkLibs = "hdfs://hacluster/jar/flink12/lib";
//пользовательская банка
String userJarPath = "hdfs://hacluster/jar/userTask/WordCount.jar";
String flinkDistJar = "hdfs://hacluster/jar/flink12/libdist/flink-dist_2.12-1.12.7.jar";
YarnClient yarnClient = YarnClient.createYarnClient();
org.apache.hadoop.conf.Configuration entries = new org.apache.hadoop.conf.Configuration();
entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/yarn-site.xml"));
entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/hdfs-site.xml"));
entries.addResource(new Path("/data/tools/bigdata/hadoop-2.7.7/etc/hadoop/core-site.xml"));
YarnConfiguration yarnConfiguration = new YarnConfiguration(entries);
yarnClient.init(yarnConfiguration);
yarnClient.start();
// Установите журнал. В противном случае вы не сможете просмотреть журнал.
YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever
.create(yarnClient);
//Получаем конфигурацию флинка
Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
configurationDirectory
);
flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
flinkConfiguration.set(
PipelineOptions.JARS,
Collections.singletonList(userJarPath)
);
Path remoteLib = new Path(flinkLibs);
flinkConfiguration.set(
YarnConfigOptions.PROVIDED_LIB_DIRS,
Collections.singletonList(remoteLib.toString())
);
// flinkConfiguration.set(
// YarnConfigOptions.FLINK_DIST_JAR,
// flinkDistJar
// );
// Установите режим ПРИЛОЖЕНИЯ.
flinkConfiguration.set(
DeploymentOptions.TARGET,
YarnDeploymentTarget.APPLICATION.getName()
);
// yarn application name
flinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "zApplication");
// flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));
// flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024", MEGA_BYTES));
YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration, configurationDirectory);
ClusterSpecification clusterSpecification = new ClusterSpecification
.ClusterSpecificationBuilder()
.createClusterSpecification();
// Установите параметры и основной класс пользовательского jar-файла.
// ApplicationConfiguration appConfig = new ApplicationConfiguration(args, "org.apache.flink.examples.java.wordcount.WordCount");
ApplicationConfiguration appConfig = new ApplicationConfiguration(args, null);
YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
flinkConfiguration,
yarnConfiguration,
yarnClient,
clusterInformationRetriever,
true
);
try {
ClusterClientProvider<ApplicationId> clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(
clusterSpecification,
appConfig
);
ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();
ApplicationId applicationId = clusterClient.getClusterId();
String webInterfaceURL = clusterClient.getWebInterfaceURL();
System.out.println("applicationId is {}" + applicationId);
System.out.println("webInterfaceURL is {}" + webInterfaceURL);
// покидать
// yarnClusterDescriptor.killCluster(applicationId);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Посмотреть пряжу
package cn.psvmc;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import com.sun.istack.logging.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
public class ConnectionSSH {
private static final Logger logger = Logger.getLogger(ConnectionSSH.class);
public static void main(String[] args) throws JSchException, IOException {
JSch jsch = new JSch();
String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//Подготовка к подключению
session.setConfig("StrictHostKeyChecking", "no");
session.connect();
String command = "flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
ChannelExec channel=(ChannelExec)session.openChannel("exec");
channel.setCommand(command);
BufferedReader in = new BufferedReader(new InputStreamReader(channel.getInputStream()));
channel.connect();
String msg;
while((msg = in.readLine()) != null){
System.out.println(msg);
}
channel.disconnect();
session.disconnect();
}
}
Использовать пароль
JSch jsch = new JSch();
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//Подготовка к подключению
session.setConfig("StrictHostKeyChecking", "no");
session.setPassword("zhangjian");
session.connect();
Использовать ключ
JSch jsch = new JSch();
String pubKeyPath = "C:\\Users\\Administrator\\.ssh\\id_rsa";
jsch.addIdentity(pubKeyPath);
String username = "root";
String host = "192.168.7.101";
Session session =jsch.getSession(username, host, 22);//Подготовка к подключению
session.setConfig("StrictHostKeyChecking", "no");
session.connect();
Помимо запуска скриптов, этот класс также может копировать файлы.
полагаться:
<dependency>
<groupId>ch.ethz.ganymed</groupId>
<artifactId>ganymed-ssh2</artifactId>
<version>build210</version>
</dependency>
Инструменты
package cn.psvmc;
import ch.ethz.ssh2.Connection;
import ch.ethz.ssh2.SCPClient;
import ch.ethz.ssh2.Session;
import ch.ethz.ssh2.StreamGobbler;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import java.io.*;
/**
* Описание: Подключитесь к серверу Linux и выполните соответствующие команды оболочки.
*/
public class ConnectLinuxCommand {
private static final Logger logger = Logger.getLogger(ConnectLinuxCommand.class);
private static final String DEFAULTCHARTSET = "UTF-8";
private static Connection conn;
/**
* @Title: login
* @Description: Метод имени пользователя и пароля Удаленный вход на Linux-сервер
* @return: Boolean
*/
public static Boolean login(RemoteConnect remoteConnect) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();// соединять
flag = conn.authenticateWithPassword(remoteConnect.getUserName(), remoteConnect.getPassword());// Сертификация
if (flag) {
logger.info("Сертификацияуспех!");
} else {
logger.error("Сертификациянеудача!");
conn.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
public static Boolean loginWithoutPwd(RemoteConnect remoteConnect) {
boolean flag = true;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();// соединять
boolean authenticationPartialSuccess = conn.isAuthenticationPartialSuccess();
System.out.println("authenticationPartialSuccess = " + authenticationPartialSuccess);
logger.info("Сертификацияуспех!");
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
/**
* @param remoteConnect объединять информационный объект
* @param keyFile Объект файла указывает на файл, содержащий закрытый ключ пользователя DSA или RSA в формате OpenSSH** (PEM, не может быть потерян" ----- BEGIN DSA PRIVATE KEY-----" or "-----BEGIN RSA PRIVATE КЛЮЧ -----" тег
* @param keyfilePass Если файл ключа зашифрован Этот параметр необходимо использовать для расшифровки. Он может иметь значение null, если шифрование отсутствует.
* @return Boolean
* @Title: loginByKey
* @Description: режим секретного ключа Удаленный вход на Linux-сервер
*/
public static Boolean loginByFileKey(RemoteConnect remoteConnect, File keyFile, String keyfilePass) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();
// Авторизоваться Сертификация
flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keyFile, keyfilePass);
if (flag) {
logger.info("Сертификацияуспех!");
} else {
logger.error("Сертификациянеудача!");
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* @param remoteConnect объединять информационный объект
* @param keys Символ [], содержащий закрытый ключ пользователя DSA или RSA (формат ключа OpenSSH, вы не можете потерять "----- begin Закрытый ключ DSA -----" или "----- BEGIN RSA PRIVATE KEY ----- "Тег. Массив символов может содержать символы новой строки/новой строки.
* @param keyPass Если массив символов секретного ключа зашифрован Это поле необходимо для расшифровки В противном случае не требуется, может быть нулевым
* @return Boolean
* @Title: loginByCharsKey
* @Description: режим секретного ключа Удаленный вход на Linux-сервер
*/
public static Boolean loginByCharsKey(RemoteConnect remoteConnect, char[] keys, String keyPass) {
boolean flag = false;
try {
conn = new Connection(remoteConnect.getIp());
conn.connect();
// Авторизоваться Сертификация
flag = conn.authenticateWithPublicKey(remoteConnect.getUserName(), keys, keyPass);
if (flag) {
logger.info("Сертификацияуспех!");
} else {
logger.error("Сертификациянеудача!");
conn.close();
}
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* @param cmd Команда сценария
* @Title: execute
* @Description: Удаленно выполнять сценарии или команды shll
* @return: result Команда выполняется и возвращается результат.
*/
public static String runCmd(String cmd) {
String result = "";
try {
Session session = conn.openSession();// открыть сеанс
session.execCommand(cmd);// выполнить команду
result = processStdout(session.getStdout(), DEFAULTCHARTSET);
// Если стандартный вывод пуст, при выполнении скрипта произошла ошибка.
if (StringUtils.isBlank(result)) {
result = processStdout(session.getStderr(), DEFAULTCHARTSET);
}
conn.close();
session.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
/**
* @return String Значение результата, возвращаемое после успешного выполнения команды. Если выполнение команды завершается неудачно, возвращается пустая строка, а не ноль.
* @Title: executeSuccess
* @Description: Удаленно выполнять сценарии оболочки или команды
*/
public static String runCmdSuccess(String cmd) {
String result = "";
try {
Session session = conn.openSession();// открыть сеанс
session.execCommand(cmd);// выполнить команду
result = processStdout(session.getStdout(), DEFAULTCHARTSET);
conn.close();
session.close();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
/**
* @param in объект входного потока
* @param charset кодирование
* @return String Вернуть в виде обычного текста
* @Title: processStdout
* @Description: Анализ возвращаемых результатов выполнения скрипта
*/
public static String processStdout(InputStream in, String charset) {
InputStream stdout = new StreamGobbler(in);
StringBuilder buffer = new StringBuilder();
try {
BufferedReader br = new BufferedReader(new InputStreamReader(stdout, charset));
String line = null;
while ((line = br.readLine()) != null) {
buffer.append(line).append("\n");
}
} catch (IOException e) {
e.printStackTrace();
}
return buffer.toString();
}
/**
* @return String
* @Description: Свяжите Linux-сервер через имя пользователя и пароль
*/
public static String runCmd(String ip, String userName, String password, String commandStr) {
logger.info(
"ConnectLinuxCommand scpGet===" +
"ip:" + ip +
" userName:" + userName +
" commandStr:" + commandStr
);
String returnStr = "";
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
try {
if (login(remoteConnect)) {
returnStr = runCmd(commandStr);
System.out.println(returnStr);
}
} catch (Exception e) {
e.printStackTrace();
}
return returnStr;
}
public static boolean connectLinuxWithoutPwd(String ip, String userName, String commandStr) {
logger.info("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " commandStr:"
+ commandStr);
String returnStr = "";
boolean result = true;
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
try {
if (loginWithoutPwd(remoteConnect)) {
returnStr = runCmd(commandStr);
System.out.println(result);
}
} catch (Exception e) {
e.printStackTrace();
}
if (StringUtils.isBlank(returnStr)) {
result = false;
}
return result;
}
/**
* @param password Пароль (другие серверы)
* @param remoteFile Местоположение файла (другие серверы)
* @param localDir Этот каталог сервера
* @Title: scpGet
* @Description: Получить файлы с других серверов в указанный каталог на этом сервере
*/
public static void scpPull(String ip, String userName, String password, String remoteFile, String localDir)
throws IOException {
logger.info("ConnectLinuxCommand scpGet===" + "ip:" + ip + " userName:" + userName + " remoteFile:"
+ remoteFile + " localDir:" + localDir);
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
if (login(remoteConnect)) {
SCPClient client = new SCPClient(conn);
client.get(remoteFile, localDir);
conn.close();
}
}
/**
* Переместить файлы, копировать на другие компьютеры
* @param ip Удаленный IP
* @param userName имя удаленного пользователя
* @param password удаленный пароль
* @param localFile локальный файл
* @param remoteDir удаленный каталог
* @throws IOException аномальный
*/
public static void scpPush(String ip, String userName, String password, String localFile, String remoteDir)
throws IOException {
logger.info("ConnectLinuxCommand scpPut===" + "ip:" + ip + " userName:" + userName + " localFile:"
+ localFile + " remoteDir:" + remoteDir);
RemoteConnect remoteConnect = new RemoteConnect();
remoteConnect.setIp(ip);
remoteConnect.setUserName(userName);
remoteConnect.setPassword(password);
if (login(remoteConnect)) {
SCPClient client = new SCPClient(conn);
client.put(localFile, remoteDir);
conn.close();
}
}
}
RemoteConnect
public class RemoteConnect {
String ip;
String userName;
String password;
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
тест
package cn.psvmc;
public class CLCTest {
public static void main(String[] args) {
mTest1();
}
public static void mTest1() {
System.out.println("--------------------------------------");
String commandStr="flink run -t yarn-per-job $FLINK_HOME/examples/batch/WordCount.jar";
String result=ConnectLinuxCommand.runCmd("192.168.7.101","root","zhangjian",commandStr);
System.out.println("Результат:"+результат);
System.out.println("--------------------------------------");
}
public static void mTest2() {
try {
ConnectLinuxCommand.scpPull("192.168.7.101","root","zhangjian", "/root/test.txt", "d:/aa");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void mTest3() {
try {
ConnectLinuxCommand.scpPush("192.168.7.101","root","zhangjian", "d:/aa/test2.txt", "/root/");
} catch (Exception e) {
e.printStackTrace();
}
}
}