kafka连接错误
来源:4-8 Source API编程之对接Kafka数据

慕容3565349
2022-01-15
14:55:22,491 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - Log file environment variable ‘log.file’ is not set.
14:55:22,495 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils [] - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable ‘log.file’ or configuration key ‘web.log.path’.
14:55:53,698 WARN org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=consumer-test-8, groupId=test] Bootstrap broker 192.168.128.10:9092 (id: -1 rack: null) disconnected
14:56:23,492 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Sink: Print to Std. Out (2/8)#0 (80f6d820c451838a843add794b31f74c) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Exception in thread “main” org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc0(AkkaInvocationHandler.java:250)atorg.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda0(AkkaInvocationHandler.java:250)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda0(AkkaInvocationHandler.java:250)atorg.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambdanull1(ClassLoadingUtils.java:93)atorg.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)atorg.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda1(ClassLoadingUtils.java:93)atorg.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)atorg.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambdaguardCompletionWithContextClassLoader2(ClassLoadingUtils.java:92)atjava.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)atjava.util.concurrent.CompletableFuture2(ClassLoadingUtils.java:92)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture2(ClassLoadingUtils.java:92)atjava.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)atjava.util.concurrent.CompletableFutureUniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils1.onComplete(AkkaFutureUtils.java:47)atakka.dispatch.OnComplete.internal(Future.scala:300)atakka.dispatch.OnComplete.internal(Future.scala:297)atakka.dispatch.japi1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi1.onComplete(AkkaFutureUtils.java:47)atakka.dispatch.OnComplete.internal(Future.scala:300)atakka.dispatch.OnComplete.internal(Future.scala:297)atakka.dispatch.japiCallbackBridge.apply(Future.scala:224)
at akka.dispatch.japiCallbackBridge.apply(Future.scala:221)atscala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)atakka.pattern.PipeToSupportCallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.pattern.PipeToSupportCallbackBridge.apply(Future.scala:221)atscala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)atakka.pattern.PipeToSupportPipeableFuture$anonfunanonfunanonfunpipeTo1.applyOrElse(PipeToSupport.scala:23)atscala.concurrent.Future.1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.1.applyOrElse(PipeToSupport.scala:23)atscala.concurrent.Future.anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree11(Promise.scala:29)atscala.concurrent.impl.Promise.1(Promise.scala:29)
at scala.concurrent.impl.Promise.1(Promise.scala:29)atscala.concurrent.impl.Promise.anonfun$transform1(Promise.scala:29)atscala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)atakka.dispatch.BatchingExecutor1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor1(Promise.scala:29)atscala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)atakka.dispatch.BatchingExecutorAbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutorBlockableBatch.BlockableBatch.BlockableBatch.anonfun$run1(BatchingExecutor.scala:100)atscala.runtime.java8.JFunction01(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction01(BatchingExecutor.scala:100)atscala.runtime.java8.JFunction0mcVsp.apply(JFunction0sp.apply(JFunction0sp.apply(JFunction0mcVsp.java:12)atscala.concurrent.BlockContextsp.java:12)
at scala.concurrent.BlockContextsp.java:12)atscala.concurrent.BlockContext.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
... 4 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Process finished with exit code 1
1回答
-
Michael_PK
2022-01-16
192.168.128.10:9092 (id: -1 rack: null) disconnected
你用kafka字段的终端生产和消费测试下,确保你的kafka服务是正常的才行。现在这情况,感觉连不到kafka呢
00
相似问题