package xyz.noark.orm.write;

import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import xyz.noark.core.annotation.Autowired;
import xyz.noark.core.annotation.Value;
import xyz.noark.core.thread.NamedThreadFactory;
import xyz.noark.log.LogHelper;
import xyz.noark.orm.DataModular;
import xyz.noark.orm.EntityMapping;
import xyz.noark.orm.accessor.DataAccessor;

/* loaded from: input_file:xyz/noark/orm/write/AbstractAsyncWriteService.class */
public abstract class AbstractAsyncWriteService implements AsyncWriteService {

    @Autowired
    protected DataAccessor dataAccessor;
    protected ScheduledExecutorService scheduledExecutorService;
    private LoadingCache<Serializable, AsyncWriteContainer> containers;
    private final int offlineInterval = 3600;

    @Value(DataModular.DATA_SAVE_INTERVAL)
    protected int saveInterval = 300;

    @Value(DataModular.DATA_BATCH_NUM)
    protected int batchOperateNum = 256;

    @Value("data.thread.pool.size")
    private int threadPoolSize = 4;

    @Value("data.thread.name.prefix")
    private String threadNamePrefix = "async-write-data";

    @Override // xyz.noark.orm.write.AsyncWriteService
    public void init() {
        this.dataAccessor.judgeAccessType();
        LogHelper.logger.info("初始化数据存储模块，批量存档数量为{},定时存档的时间间隔为{}秒", new Object[]{Integer.valueOf(this.batchOperateNum), Integer.valueOf(this.saveInterval)});
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(this.threadPoolSize, (ThreadFactory) new NamedThreadFactory(this.threadNamePrefix));
        this.containers = Caffeine.newBuilder().expireAfterAccess(3600L, TimeUnit.SECONDS).removalListener(new RemovalListener<Serializable, AsyncWriteContainer>() { // from class: xyz.noark.orm.write.AbstractAsyncWriteService.1
            public void onRemoval(Serializable serializable, AsyncWriteContainer asyncWriteContainer, RemovalCause removalCause) {
                LogHelper.logger.info("销毁{}秒都没有读写操作的异步回写容器 groupId={}", new Object[]{3600, serializable});
                asyncWriteContainer.syncFlush();
                asyncWriteContainer.close();
            }
        }).build(new CacheLoader<Serializable, AsyncWriteContainer>() { // from class: xyz.noark.orm.write.AbstractAsyncWriteService.2
            public AsyncWriteContainer load(Serializable serializable) {
                LogHelper.logger.info("创建异步回写容器 groupId={}", new Object[]{serializable});
                return new AsyncWriteContainer(serializable, AbstractAsyncWriteService.this.saveInterval, AbstractAsyncWriteService.this.scheduledExecutorService, AbstractAsyncWriteService.this.dataAccessor, AbstractAsyncWriteService.this.batchOperateNum);
            }
        });
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // from class: xyz.noark.orm.write.AbstractAsyncWriteService.3
            @Override // java.lang.Runnable
            public void run() {
                AbstractAsyncWriteService.this.containers.cleanUp();
            }
        }, 3600L, 3600L, TimeUnit.SECONDS);
    }

    @Override // xyz.noark.orm.write.AsyncWriteService
    public <T> void insert(EntityMapping<T> entityMapping, T t) {
        operation(entityMapping, t, OperateType.INSERT);
    }

    @Override // xyz.noark.orm.write.AsyncWriteService
    public <T> void delete(EntityMapping<T> entityMapping, T t) {
        operation(entityMapping, t, OperateType.DELETE);
    }

    @Override // xyz.noark.orm.write.AsyncWriteService
    public <T> void deleteAll(EntityMapping<T> entityMapping, List<T> list) {
        Iterator<T> it = list.iterator();
        while (it.hasNext()) {
            operation(entityMapping, it.next(), OperateType.DELETE);
        }
    }

    @Override // xyz.noark.orm.write.AsyncWriteService
    public <T> void update(EntityMapping<T> entityMapping, T t) {
        operation(entityMapping, t, OperateType.UPDATE);
    }

    protected <T> void operation(EntityMapping<T> entityMapping, T t, OperateType operateType) {
        AsyncWriteContainer asyncWriteContainer = (AsyncWriteContainer) this.containers.get(analysisGroupIdByEntity(entityMapping, t));
        switch (operateType) {
            case INSERT:
                asyncWriteContainer.insert(entityMapping, t);
                return;
            case DELETE:
                asyncWriteContainer.delete(entityMapping, t);
                return;
            case UPDATE:
                asyncWriteContainer.update(entityMapping, t);
                return;
            default:
                LogHelper.logger.warn("这是要干嘛？ type={},entity={}", new Object[]{operateType, t});
                return;
        }
    }

    protected abstract <T> Serializable analysisGroupIdByEntity(EntityMapping<T> entityMapping, T t);

    @Override // xyz.noark.orm.write.AsyncWriteService
    public void shutdown() {
        LogHelper.logger.info("开始通知数据保存任务线程池关闭.");
        syncFlushAll();
        this.scheduledExecutorService.shutdown();
        try {
            if (!this.scheduledExecutorService.awaitTermination(10L, TimeUnit.MINUTES)) {
                this.scheduledExecutorService.shutdownNow();
            }
            LogHelper.logger.info("数据保存任务线程池已全部回写完，关闭成功.");
        } catch (InterruptedException e) {
            LogHelper.logger.error("数据保存任务线程池停机时发生异常.", new Object[]{e});
            this.scheduledExecutorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void syncFlushAll() {
        Iterator it = this.containers.asMap().values().iterator();
        while (it.hasNext()) {
            ((AsyncWriteContainer) it.next()).syncFlush();
        }
    }

    public void asyncFlushByGroupId(Serializable serializable) {
        AsyncWriteContainer asyncWriteContainer = (AsyncWriteContainer) this.containers.get(serializable);
        if (asyncWriteContainer != null) {
            this.scheduledExecutorService.submit(asyncWriteContainer);
        }
    }
}
