重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
在使用kafka high-level的consumer,使用多线程消费数据时报错,简单分析一下原因下载 ,ConsumerIterator取不到消息时会阻塞,并且将内部状态置为FAILED,当其他线程访问时就会抛出异常。
公司主营业务:成都做网站、成都网站设计、移动网站开发等业务。帮助企业客户真正实现互联网宣传,提高企业的竞争能力。创新互联建站是一支青春激扬、勤奋敬业、活力青春激扬、勤奋敬业、活力澎湃、和谐高效的团队。公司秉承以“开放、自由、严谨、自律”为核心的企业文化,感谢他们对我们的高要求,感谢他们从不同领域给我们带来的挑战,让我们激情的团队有机会用头脑与智慧不断的给客户带来惊喜。创新互联建站推出太子河免费做网站回馈大家。
Java代码
def hasNext(): Boolean = {
if(state == FAILED) //处于FAILED状态时,另外线程访问会直接异常
throw new IllegalStateException("Iterator is in failed state")
state match {
case DONE => false
case READY => true
case _ => maybeComputeNext()
}
}
def maybeComputeNext(): Boolean = {
state = FAILED //重置了状态
nextItem = Some(makeNext())
if(state == DONE) {
false
} else {
state = READY
true
}
}
下载
protected def makeNext(): MessageAndMetadata[K, V] = {
var currentDataChunk: FetchedDataChunk = null
// if we don't have an iterator, get one
var localCurrent = current.get()
if(localCurrent == null || !localCurrent.hasNext) {
if (consumerTimeoutMs < 0)
currentDataChunk = channel.take //channel是BlockingQueue这里会阻塞
else {
currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
if (currentDataChunk == null) {
// reset state to make the iterator re-iterable
resetState()
throw new ConsumerTimeoutException
}
}
//省略部分代码
}