Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,78 @@
package com.datasophon.common.utils;

import com.datasophon.common.model.ProcInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OlapUtils {

private static final Logger logger = LoggerFactory.getLogger(OlapUtils.class);

public static ExecResult addFollower(String feMaster, String hostname) {
ExecResult execResult = new ExecResult();
String sql = "ALTER SYSTEM add FOLLOWER \"" + hostname + ":9010\";";
logger.info("Add fe to cluster , the sql is {}", sql);
try {
List<ProcInfo> frontends = showFrontends(feMaster);
for (ProcInfo proc : frontends) {
if (proc.getHostName().equalsIgnoreCase(hostname)) {
logger.info("Follower {} already exists. Skip adding.", hostname);
// 幂等处理:已存在也返回成功
execResult.setExecResult(true);
return execResult;
}
}
String sql = "ALTER SYSTEM add FOLLOWER \"" + hostname + ":9010\";";
logger.info("Add fe to cluster , the sql is {}", sql);
executeSql(feMaster, hostname, sql);
execResult.setExecResult(true);
} catch (ClassNotFoundException e) {
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return execResult;
}

public static ExecResult addObserver(String feMaster, String hostname) {
ExecResult execResult = new ExecResult();
String sql = "ALTER SYSTEM add OBSERVER \"" + hostname + ":9010\";";
logger.info("Add fe to cluster , the sql is {}", sql);
try {
List<ProcInfo> frontends = showFrontends(feMaster);
for (ProcInfo proc : frontends) {
if (proc.getHostName().equalsIgnoreCase(hostname)) {
logger.info("Observer {} already exists. Skip adding.", hostname);
execResult.setExecResult(true);
return execResult;
}
}
String sql = "ALTER SYSTEM add OBSERVER \"" + hostname + ":9010\";";
logger.info("Add fe to cluster , the sql is {}", sql);
executeSql(feMaster, hostname, sql);
execResult.setExecResult(true);
} catch (ClassNotFoundException e) {
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return execResult;
}

public static ExecResult addBackend(String feMaster, String hostname) {
ExecResult execResult = new ExecResult();
String sql = "ALTER SYSTEM add BACKEND \"" + hostname + ":9050\";";
logger.info("Add be to cluster , the sql is {}", sql);

try {
List<ProcInfo> backends = showBackends(feMaster);
for (ProcInfo proc : backends) {
if (proc.getHostName().equalsIgnoreCase(hostname)) {
logger.info("Backend {} already exists. Skip adding.", hostname);
execResult.setExecResult(true);
return execResult;
}
}
String sql = "ALTER SYSTEM add BACKEND \"" + hostname + ":9050\";";
logger.info("Add be to cluster , the sql is {}", sql);
executeSql(feMaster, hostname, sql);
execResult.setExecResult(true);
} catch (ClassNotFoundException e) {
} catch (ClassNotFoundException | SQLException e) {
e.printStackTrace();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return execResult;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.datasophon.worker.strategy;

import akka.actor.ActorRef;
import cn.hutool.core.net.NetUtil;
import com.datasophon.common.command.OlapOpsType;
import com.datasophon.common.command.OlapSqlExecCommand;
import com.datasophon.common.command.ServiceRoleOperateCommand;
Expand All @@ -26,9 +28,6 @@
import com.datasophon.worker.handler.ServiceHandler;
import com.datasophon.worker.utils.ActorUtils;

import akka.actor.ActorRef;
import cn.hutool.core.net.NetUtil;

public class BEHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy {

public BEHandlerStrategy(String serviceName, String serviceRoleName) {
Expand All @@ -41,30 +40,43 @@ public ExecResult handler(ServiceRoleOperateCommand command) {
ServiceHandler serviceHandler = new ServiceHandler(command.getServiceName(), command.getServiceRoleName());

if (command.getCommandType().equals(CommandType.INSTALL_SERVICE)) {
logger.info("add be to cluster");
logger.info("prepare to add be to cluster before start");
try {

OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
sqlExecCommand.setFeMaster(command.getMasterHost());
sqlExecCommand.setHostName(NetUtil.getLocalhostStr());
sqlExecCommand.setOpsType(OlapOpsType.ADD_BE);

ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
.tell(sqlExecCommand, ActorRef.noSender());

logger.info("add be command sent to master successfully");

} catch (Exception e) {
logger.error("add backend failed: {}", ThrowableUtils.getStackTrace(e));
startResult.setExecResult(false);
startResult.setExecOut("Failed to add backend to FE before start.");
return startResult;
}

logger.info("starting slave be...");
startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(),
command.getDecompressPackageName(), command.getRunAs());

if (startResult.getExecResult()) {
try {
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
sqlExecCommand.setFeMaster(command.getMasterHost());
// 使用IP 否则应用侧使用时需要设置host
sqlExecCommand.setHostName(NetUtil.getLocalhostStr());
sqlExecCommand.setOpsType(OlapOpsType.ADD_BE);
ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
.tell(sqlExecCommand, ActorRef.noSender());
} catch (Exception e) {
logger.error("add backend failed {}", ThrowableUtils.getStackTrace(e));
}
logger.info("slave be start success");
} else {
logger.error("slave be start failed");
}

} else {
// 非安装类命令,直接启动
startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(),
command.getDecompressPackageName(), command.getRunAs());
}

return startResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.datasophon.worker.strategy;

import akka.actor.ActorRef;
import cn.hutool.core.net.NetUtil;
import cn.hutool.json.JSONUtil;
import com.datasophon.common.command.OlapOpsType;
import com.datasophon.common.command.OlapSqlExecCommand;
import com.datasophon.common.command.ServiceRoleOperateCommand;
Expand All @@ -29,11 +32,6 @@

import java.util.ArrayList;

import akka.actor.ActorRef;

import cn.hutool.core.net.NetUtil;
import cn.hutool.json.JSONUtil;

public class FEHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy {

public FEHandlerStrategy(String serviceName, String serviceRoleName) {
Expand All @@ -43,11 +41,33 @@ public FEHandlerStrategy(String serviceName, String serviceRoleName) {
@Override
public ExecResult handler(ServiceRoleOperateCommand command) {
ExecResult startResult = new ExecResult();
logger.info("FEHandlerStrategy start fe" + JSONUtil.toJsonStr(command));
logger.info("FEHandlerStrategy start fe: {}", JSONUtil.toJsonStr(command));
ServiceHandler serviceHandler = new ServiceHandler(command.getServiceName(), command.getServiceRoleName());

if (command.getCommandType() == CommandType.INSTALL_SERVICE) {
if (command.isSlave()) {
logger.info("first start fe");
logger.info("prepare to add slave fe follower before start");

try {
// 先尝试添加 follower
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
sqlExecCommand.setFeMaster(command.getMasterHost());
sqlExecCommand.setHostName(NetUtil.getLocalhostStr());
sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_FOLLOWER);

ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
.tell(sqlExecCommand, ActorRef.noSender());

logger.info("slave fe follower add command sent to master");

} catch (Exception e) {
logger.error("add slave fe follower failed: {}", ThrowableUtils.getStackTrace(e));
startResult.setExecResult(false);
startResult.setExecOut("Failed to add follower before start.");
return startResult;
}

logger.info("starting slave fe...");
ArrayList<String> commands = new ArrayList<>();
commands.add("--helper");
commands.add(command.getMasterHost() + ":9010");
Expand All @@ -57,24 +77,16 @@ public ExecResult handler(ServiceRoleOperateCommand command) {
startRunner.setProgram(command.getStartRunner().getProgram());
startRunner.setArgs(commands);
startRunner.setTimeout("600");

startResult = serviceHandler.start(startRunner, command.getStatusRunner(),
command.getDecompressPackageName(), command.getRunAs());

if (startResult.getExecResult()) {
// add follower
try {
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
sqlExecCommand.setFeMaster(command.getMasterHost());
sqlExecCommand.setHostName(NetUtil.getLocalhostStr());
sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_FOLLOWER);
ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
.tell(sqlExecCommand, ActorRef.noSender());
logger.info("slave fe start success");
} catch (Exception e) {
logger.error("add slave fe failed {}", ThrowableUtils.getStackTrace(e));
}
logger.info("slave fe started successfully");
} else {
logger.error("slave fe start failed");
}

} else {
startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(),
command.getDecompressPackageName(), command.getRunAs());
Expand All @@ -83,6 +95,8 @@ public ExecResult handler(ServiceRoleOperateCommand command) {
startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(),
command.getDecompressPackageName(), command.getRunAs());
}

return startResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.datasophon.worker.strategy;

import akka.actor.ActorRef;
import cn.hutool.core.net.NetUtil;
import cn.hutool.json.JSONUtil;
import com.datasophon.common.command.OlapOpsType;
import com.datasophon.common.command.OlapSqlExecCommand;
import com.datasophon.common.command.ServiceRoleOperateCommand;
Expand All @@ -29,11 +32,6 @@

import java.util.ArrayList;

import akka.actor.ActorRef;

import cn.hutool.core.net.NetUtil;
import cn.hutool.json.JSONUtil;

public class FEObserverHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy {

public FEObserverHandlerStrategy(String serviceName, String serviceRoleName) {
Expand All @@ -43,10 +41,32 @@ public FEObserverHandlerStrategy(String serviceName, String serviceRoleName) {
@Override
public ExecResult handler(ServiceRoleOperateCommand command) {
ExecResult startResult = new ExecResult();
logger.info("FEObserverHandlerStrategy start fe observer" + JSONUtil.toJsonStr(command));
logger.info("FEObserverHandlerStrategy start fe observer: {}", JSONUtil.toJsonStr(command));

ServiceHandler serviceHandler = new ServiceHandler(command.getServiceName(), command.getServiceRoleName());

if (command.getCommandType() == CommandType.INSTALL_SERVICE) {
logger.info("first start fe observer");
logger.info("prepare to add fe observer before start");
// 提前发送 add observer 命令
try {
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
sqlExecCommand.setFeMaster(command.getMasterHost());
sqlExecCommand.setHostName(NetUtil.getLocalhostStr());
sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_OBSERVER);

ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
.tell(sqlExecCommand, ActorRef.noSender());

logger.info("add fe observer command sent to master successfully");

} catch (Exception e) {
logger.error("add fe observer failed: {}", ThrowableUtils.getStackTrace(e));
startResult.setExecResult(false);
startResult.setExecOut("Failed to add fe observer before start.");
return startResult;
}

logger.info("starting fe observer...");
ArrayList<String> commands = new ArrayList<>();
commands.add("--helper");
commands.add(command.getMasterHost() + ":9010");
Expand All @@ -56,28 +76,23 @@ public ExecResult handler(ServiceRoleOperateCommand command) {
startRunner.setProgram(command.getStartRunner().getProgram());
startRunner.setArgs(commands);
startRunner.setTimeout("60");

startResult = serviceHandler.start(startRunner, command.getStatusRunner(),
command.getDecompressPackageName(), command.getRunAs());

if (startResult.getExecResult()) {
// add observer
try {
OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand();
sqlExecCommand.setFeMaster(command.getMasterHost());
sqlExecCommand.setHostName(NetUtil.getLocalhostStr());
sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_OBSERVER);
ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor")
.tell(sqlExecCommand, ActorRef.noSender());
} catch (Exception e) {
logger.error("add fe observer failed {}", ThrowableUtils.getStackTrace(e));
}
logger.info("fe observer start success");
} else {
logger.error("fe observer start failed");
}

} else {
// 非安装类命令直接启动
startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(),
command.getDecompressPackageName(), command.getRunAs());
}

return startResult;
}

}