/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.*;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Store all metadata downtime for recovery, data protection reliability
*/
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
protected final static int BLANK_MAGIC_CODE = -875286124;
//文件队列,通过它来获取MappedFile(代表commitLog文件夹)
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
//刷盘线程:同步刷盘和异步刷盘 GroupCommitService同步刷盘线程,FlushRealTimeService异步刷盘线程
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
//暂存池专用缓存刷盘线程:CommitRealTimeService专用写入缓存writeBuffer提交线程(需要开启writeBuffer才起作用)
private final FlushCommitLogService commitLogService;
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
//topic每个分区的消费偏移量(一个map结构)
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
//锁:可重入锁 和 自旋锁
protected final PutMessageLock putMessageLock;
//CommitLog初始化,开启后台刷盘线程
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
//刷盘类型:同步刷盘、异步刷盘
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//开启同步刷盘线程
this.flushCommitLogService = new GroupCommitService();
} else {
//开启异步刷盘线程
this.flushCommitLogService = new FlushRealTimeService();
}
//暂存池异步刷盘:专用缓存writeBuffer刷盘提交线程(将写入暂存池中的数据,进行异步刷盘操作)
this.commitLogService = new CommitRealTimeService();
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
//put数据写入锁:重入锁 或 自旋锁,默认是自旋锁
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
//启动刷盘线程(GroupCommitService同步刷盘线程,FlushRealTimeService异步刷盘线程)
public void start() {
//启动刷盘线程
this.flushCommitLogService.start();
//专用缓存提交:根据暂存池开关判断是否启动 writeBuffer的提交线程(专用缓存打开相当于加了一个二级缓存writeBuffer,没打开,只有一个一级缓存pageCache)
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
public void shutdown() {
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
}
this.flushCommitLogService.shutdown();
}
public long flush() {
//提交专用缓存
this.mappedFileQueue.commit(0);
//刷盘
this.mappedFileQueue.flush(0);
return this.mappedFileQueue.getFlushedWhere();
}
public long getMaxOffset() {
return this.mappedFileQueue.getMaxOffset();
}
public long remainHowManyDataToCommit() {
return this.mappedFileQueue.remainHowManyDataToCommit();
}
public long remainHowManyDataToFlush() {
return this.mappedFileQueue.remainHowManyDataToFlush();
}
//删除过期的commitLog文件
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
/**
* Read CommitLog data, use data replication
*/
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
/**
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defau