diff --git a/app/src/main/scala/hstream/server/KafkaBroker.scala b/app/src/main/scala/hstream/server/KafkaBroker.scala index 6dbf699..22dc4ba 100644 --- a/app/src/main/scala/hstream/server/KafkaBroker.scala +++ b/app/src/main/scala/hstream/server/KafkaBroker.scala @@ -237,6 +237,8 @@ class KafkaBroker( s"""docker run -d --network host --name $metaServerContainerName -v $storeConfig:$storeConfig:ro $image hstream-meta-server --host 127.0.0.1 --port $metaServerPort --backend $storeConfig + --log-level trace + --log-flush-immediately """.stripMargin.linesIterator.mkString(" ").trim info(s"=> Start meta server by: $metaServerCmd") Utils.runCommand(metaServerCmd) @@ -317,6 +319,9 @@ class KafkaBroker( s"docker run --rm --network host zookeeper:3.6 zkCli.sh -server 127.0.0.1:$metastorePort deleteall /hstream".! // === spec 2: hornbill } else if (spec == 2) { + // Dump meta server logs + info("=> dump meta server logs...") + dumpMetaContainerLogs() // Remove meta server container val metaServerContainerName = config.testingConfig .getOrElse( @@ -370,7 +375,9 @@ class KafkaBroker( val containerLogsDir = config.testingConfig .getOrElse("container_logs_dir", throw new IllegalArgumentException("container.logs_dir is required")) .asInstanceOf[Path] - Files.createDirectories(containerLogsDir) + if (!Files.exists(containerLogsDir)) { + Files.createDirectories(containerLogsDir) + } // FIXME: use "docker logs" may cause incomplete logs (? need to investigate) // @@ -406,4 +413,45 @@ class KafkaBroker( } } + private def dumpMetaContainerLogs() = { + val metaServerContainerName = config.testingConfig + .getOrElse( + "metaserver_container_name", + throw new IllegalArgumentException("metaserver_container_name is required") + ) + .asInstanceOf[String] + val containerLogsDir = config.testingConfig + .getOrElse("container_logs_dir", throw new IllegalArgumentException("container.logs_dir is required")) + .asInstanceOf[Path] + if (!Files.exists(containerLogsDir)) { + Files.createDirectories(containerLogsDir) + } + val fileName = Paths.get(s"$containerLogsDir/$metaServerContainerName.log") + if (!Files.exists(fileName)) { + Files.createFile(fileName) + } + val writer = Files.newBufferedWriter(fileName, StandardOpenOption.APPEND) + val processLogger = ProcessLogger( + stdout => writer.write(stdout + "\n"), + stderr => writer.write(stderr + "\n") + ) + try { + val cmd = Seq("docker", "logs", metaServerContainerName) + val code = Process(cmd).!(processLogger) + if (code != 0) { + error(s"Failed to dump logs to $fileName, exit code: $code") + } else { + // add a separator line to separate logs from different runs + writer.write("\n=================================================\n\n") + info(s"Dump logs to $fileName") + } + } catch { + case e: Exception => + error(s"Failed to dump logs to $fileName, error: $e") + } finally { + writer.flush() + writer.close() + } + } + }