diff --git a/datasophon-common/src/main/java/com/datasophon/common/utils/OlapUtils.java b/datasophon-common/src/main/java/com/datasophon/common/utils/OlapUtils.java index 13b34369..b101124f 100644 --- a/datasophon-common/src/main/java/com/datasophon/common/utils/OlapUtils.java +++ b/datasophon-common/src/main/java/com/datasophon/common/utils/OlapUtils.java @@ -18,65 +18,78 @@ package com.datasophon.common.utils; import com.datasophon.common.model.ProcInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.ArrayList; import java.util.List; import java.util.Objects; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class OlapUtils { private static final Logger logger = LoggerFactory.getLogger(OlapUtils.class); public static ExecResult addFollower(String feMaster, String hostname) { ExecResult execResult = new ExecResult(); - String sql = "ALTER SYSTEM add FOLLOWER \"" + hostname + ":9010\";"; - logger.info("Add fe to cluster , the sql is {}", sql); try { + List frontends = showFrontends(feMaster); + for (ProcInfo proc : frontends) { + if (proc.getHostName().equalsIgnoreCase(hostname)) { + logger.info("Follower {} already exists. Skip adding.", hostname); + // 幂等处理:已存在也返回成功 + execResult.setExecResult(true); + return execResult; + } + } + String sql = "ALTER SYSTEM add FOLLOWER \"" + hostname + ":9010\";"; + logger.info("Add fe to cluster , the sql is {}", sql); executeSql(feMaster, hostname, sql); execResult.setExecResult(true); - } catch (ClassNotFoundException e) { + } catch (ClassNotFoundException | SQLException e) { e.printStackTrace(); - } catch (SQLException throwables) { - throwables.printStackTrace(); } return execResult; } public static ExecResult addObserver(String feMaster, String hostname) { ExecResult execResult = new ExecResult(); - String sql = "ALTER SYSTEM add OBSERVER \"" + hostname + ":9010\";"; - logger.info("Add fe to cluster , the sql is {}", sql); try { + List frontends = showFrontends(feMaster); + for (ProcInfo proc : frontends) { + if (proc.getHostName().equalsIgnoreCase(hostname)) { + logger.info("Observer {} already exists. Skip adding.", hostname); + execResult.setExecResult(true); + return execResult; + } + } + String sql = "ALTER SYSTEM add OBSERVER \"" + hostname + ":9010\";"; + logger.info("Add fe to cluster , the sql is {}", sql); executeSql(feMaster, hostname, sql); execResult.setExecResult(true); - } catch (ClassNotFoundException e) { + } catch (ClassNotFoundException | SQLException e) { e.printStackTrace(); - } catch (SQLException throwables) { - throwables.printStackTrace(); } return execResult; } public static ExecResult addBackend(String feMaster, String hostname) { ExecResult execResult = new ExecResult(); - String sql = "ALTER SYSTEM add BACKEND \"" + hostname + ":9050\";"; - logger.info("Add be to cluster , the sql is {}", sql); - try { + List backends = showBackends(feMaster); + for (ProcInfo proc : backends) { + if (proc.getHostName().equalsIgnoreCase(hostname)) { + logger.info("Backend {} already exists. Skip adding.", hostname); + execResult.setExecResult(true); + return execResult; + } + } + String sql = "ALTER SYSTEM add BACKEND \"" + hostname + ":9050\";"; + logger.info("Add be to cluster , the sql is {}", sql); executeSql(feMaster, hostname, sql); execResult.setExecResult(true); - } catch (ClassNotFoundException e) { + } catch (ClassNotFoundException | SQLException e) { e.printStackTrace(); - } catch (SQLException throwables) { - throwables.printStackTrace(); } return execResult; } diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/BEHandlerStrategy.java b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/BEHandlerStrategy.java index 4887c298..412e1402 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/BEHandlerStrategy.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/BEHandlerStrategy.java @@ -17,6 +17,8 @@ package com.datasophon.worker.strategy; +import akka.actor.ActorRef; +import cn.hutool.core.net.NetUtil; import com.datasophon.common.command.OlapOpsType; import com.datasophon.common.command.OlapSqlExecCommand; import com.datasophon.common.command.ServiceRoleOperateCommand; @@ -26,9 +28,6 @@ import com.datasophon.worker.handler.ServiceHandler; import com.datasophon.worker.utils.ActorUtils; -import akka.actor.ActorRef; -import cn.hutool.core.net.NetUtil; - public class BEHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy { public BEHandlerStrategy(String serviceName, String serviceRoleName) { @@ -41,30 +40,43 @@ public ExecResult handler(ServiceRoleOperateCommand command) { ServiceHandler serviceHandler = new ServiceHandler(command.getServiceName(), command.getServiceRoleName()); if (command.getCommandType().equals(CommandType.INSTALL_SERVICE)) { - logger.info("add be to cluster"); + logger.info("prepare to add be to cluster before start"); + try { + + OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand(); + sqlExecCommand.setFeMaster(command.getMasterHost()); + sqlExecCommand.setHostName(NetUtil.getLocalhostStr()); + sqlExecCommand.setOpsType(OlapOpsType.ADD_BE); + + ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor") + .tell(sqlExecCommand, ActorRef.noSender()); + + logger.info("add be command sent to master successfully"); + + } catch (Exception e) { + logger.error("add backend failed: {}", ThrowableUtils.getStackTrace(e)); + startResult.setExecResult(false); + startResult.setExecOut("Failed to add backend to FE before start."); + return startResult; + } + logger.info("starting slave be..."); startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(), command.getDecompressPackageName(), command.getRunAs()); + if (startResult.getExecResult()) { - try { - OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand(); - sqlExecCommand.setFeMaster(command.getMasterHost()); - // 使用IP 否则应用侧使用时需要设置host - sqlExecCommand.setHostName(NetUtil.getLocalhostStr()); - sqlExecCommand.setOpsType(OlapOpsType.ADD_BE); - ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor") - .tell(sqlExecCommand, ActorRef.noSender()); - } catch (Exception e) { - logger.error("add backend failed {}", ThrowableUtils.getStackTrace(e)); - } logger.info("slave be start success"); } else { logger.error("slave be start failed"); } + } else { + // 非安装类命令,直接启动 startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(), command.getDecompressPackageName(), command.getRunAs()); } + return startResult; } + } diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEHandlerStrategy.java b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEHandlerStrategy.java index 859d8b0c..d72583c0 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEHandlerStrategy.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEHandlerStrategy.java @@ -17,6 +17,9 @@ package com.datasophon.worker.strategy; +import akka.actor.ActorRef; +import cn.hutool.core.net.NetUtil; +import cn.hutool.json.JSONUtil; import com.datasophon.common.command.OlapOpsType; import com.datasophon.common.command.OlapSqlExecCommand; import com.datasophon.common.command.ServiceRoleOperateCommand; @@ -29,11 +32,6 @@ import java.util.ArrayList; -import akka.actor.ActorRef; - -import cn.hutool.core.net.NetUtil; -import cn.hutool.json.JSONUtil; - public class FEHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy { public FEHandlerStrategy(String serviceName, String serviceRoleName) { @@ -43,11 +41,33 @@ public FEHandlerStrategy(String serviceName, String serviceRoleName) { @Override public ExecResult handler(ServiceRoleOperateCommand command) { ExecResult startResult = new ExecResult(); - logger.info("FEHandlerStrategy start fe" + JSONUtil.toJsonStr(command)); + logger.info("FEHandlerStrategy start fe: {}", JSONUtil.toJsonStr(command)); ServiceHandler serviceHandler = new ServiceHandler(command.getServiceName(), command.getServiceRoleName()); + if (command.getCommandType() == CommandType.INSTALL_SERVICE) { if (command.isSlave()) { - logger.info("first start fe"); + logger.info("prepare to add slave fe follower before start"); + + try { + // 先尝试添加 follower + OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand(); + sqlExecCommand.setFeMaster(command.getMasterHost()); + sqlExecCommand.setHostName(NetUtil.getLocalhostStr()); + sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_FOLLOWER); + + ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor") + .tell(sqlExecCommand, ActorRef.noSender()); + + logger.info("slave fe follower add command sent to master"); + + } catch (Exception e) { + logger.error("add slave fe follower failed: {}", ThrowableUtils.getStackTrace(e)); + startResult.setExecResult(false); + startResult.setExecOut("Failed to add follower before start."); + return startResult; + } + + logger.info("starting slave fe..."); ArrayList commands = new ArrayList<>(); commands.add("--helper"); commands.add(command.getMasterHost() + ":9010"); @@ -57,24 +77,16 @@ public ExecResult handler(ServiceRoleOperateCommand command) { startRunner.setProgram(command.getStartRunner().getProgram()); startRunner.setArgs(commands); startRunner.setTimeout("600"); + startResult = serviceHandler.start(startRunner, command.getStatusRunner(), command.getDecompressPackageName(), command.getRunAs()); + if (startResult.getExecResult()) { - // add follower - try { - OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand(); - sqlExecCommand.setFeMaster(command.getMasterHost()); - sqlExecCommand.setHostName(NetUtil.getLocalhostStr()); - sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_FOLLOWER); - ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor") - .tell(sqlExecCommand, ActorRef.noSender()); - logger.info("slave fe start success"); - } catch (Exception e) { - logger.error("add slave fe failed {}", ThrowableUtils.getStackTrace(e)); - } + logger.info("slave fe started successfully"); } else { logger.error("slave fe start failed"); } + } else { startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(), command.getDecompressPackageName(), command.getRunAs()); @@ -83,6 +95,8 @@ public ExecResult handler(ServiceRoleOperateCommand command) { startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(), command.getDecompressPackageName(), command.getRunAs()); } + return startResult; } + } diff --git a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEObserverHandlerStrategy.java b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEObserverHandlerStrategy.java index cd0ae1b9..a5a1d8ca 100644 --- a/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEObserverHandlerStrategy.java +++ b/datasophon-worker/src/main/java/com/datasophon/worker/strategy/FEObserverHandlerStrategy.java @@ -17,6 +17,9 @@ package com.datasophon.worker.strategy; +import akka.actor.ActorRef; +import cn.hutool.core.net.NetUtil; +import cn.hutool.json.JSONUtil; import com.datasophon.common.command.OlapOpsType; import com.datasophon.common.command.OlapSqlExecCommand; import com.datasophon.common.command.ServiceRoleOperateCommand; @@ -29,11 +32,6 @@ import java.util.ArrayList; -import akka.actor.ActorRef; - -import cn.hutool.core.net.NetUtil; -import cn.hutool.json.JSONUtil; - public class FEObserverHandlerStrategy extends AbstractHandlerStrategy implements ServiceRoleStrategy { public FEObserverHandlerStrategy(String serviceName, String serviceRoleName) { @@ -43,10 +41,32 @@ public FEObserverHandlerStrategy(String serviceName, String serviceRoleName) { @Override public ExecResult handler(ServiceRoleOperateCommand command) { ExecResult startResult = new ExecResult(); - logger.info("FEObserverHandlerStrategy start fe observer" + JSONUtil.toJsonStr(command)); + logger.info("FEObserverHandlerStrategy start fe observer: {}", JSONUtil.toJsonStr(command)); + ServiceHandler serviceHandler = new ServiceHandler(command.getServiceName(), command.getServiceRoleName()); + if (command.getCommandType() == CommandType.INSTALL_SERVICE) { - logger.info("first start fe observer"); + logger.info("prepare to add fe observer before start"); + // 提前发送 add observer 命令 + try { + OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand(); + sqlExecCommand.setFeMaster(command.getMasterHost()); + sqlExecCommand.setHostName(NetUtil.getLocalhostStr()); + sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_OBSERVER); + + ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor") + .tell(sqlExecCommand, ActorRef.noSender()); + + logger.info("add fe observer command sent to master successfully"); + + } catch (Exception e) { + logger.error("add fe observer failed: {}", ThrowableUtils.getStackTrace(e)); + startResult.setExecResult(false); + startResult.setExecOut("Failed to add fe observer before start."); + return startResult; + } + + logger.info("starting fe observer..."); ArrayList commands = new ArrayList<>(); commands.add("--helper"); commands.add(command.getMasterHost() + ":9010"); @@ -56,28 +76,23 @@ public ExecResult handler(ServiceRoleOperateCommand command) { startRunner.setProgram(command.getStartRunner().getProgram()); startRunner.setArgs(commands); startRunner.setTimeout("60"); + startResult = serviceHandler.start(startRunner, command.getStatusRunner(), command.getDecompressPackageName(), command.getRunAs()); + if (startResult.getExecResult()) { - // add observer - try { - OlapSqlExecCommand sqlExecCommand = new OlapSqlExecCommand(); - sqlExecCommand.setFeMaster(command.getMasterHost()); - sqlExecCommand.setHostName(NetUtil.getLocalhostStr()); - sqlExecCommand.setOpsType(OlapOpsType.ADD_FE_OBSERVER); - ActorUtils.getRemoteActor(command.getManagerHost(), "masterNodeProcessingActor") - .tell(sqlExecCommand, ActorRef.noSender()); - } catch (Exception e) { - logger.error("add fe observer failed {}", ThrowableUtils.getStackTrace(e)); - } logger.info("fe observer start success"); } else { logger.error("fe observer start failed"); } + } else { + // 非安装类命令直接启动 startResult = serviceHandler.start(command.getStartRunner(), command.getStatusRunner(), command.getDecompressPackageName(), command.getRunAs()); } + return startResult; } + }