/*
 * Decompiled with CFR 0.152.
 */
package com.alipay.sofa.registry.server.data.remoting.dataserver.handler;

import com.alipay.remoting.Connection;
import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.GenericResponse;
import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.dataserver.Datum;
import com.alipay.sofa.registry.common.model.dataserver.GetDataRequest;
import com.alipay.sofa.registry.common.model.dataserver.NotifyFetchDatumRequest;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.remoting.Server;
import com.alipay.sofa.registry.remoting.exchange.Exchange;
import com.alipay.sofa.registry.server.data.bootstrap.DataServerConfig;
import com.alipay.sofa.registry.server.data.cache.DataServerCache;
import com.alipay.sofa.registry.server.data.cache.DatumCache;
import com.alipay.sofa.registry.server.data.change.DataChangeTypeEnum;
import com.alipay.sofa.registry.server.data.change.DataSourceTypeEnum;
import com.alipay.sofa.registry.server.data.change.event.DataChangeEventCenter;
import com.alipay.sofa.registry.server.data.executor.ExecutorFactory;
import com.alipay.sofa.registry.server.data.remoting.dataserver.DataServerConnectionFactory;
import com.alipay.sofa.registry.server.data.remoting.handler.AbstractServerHandler;
import com.alipay.sofa.registry.server.data.util.TimeUtil;
import com.alipay.sofa.registry.util.ParaCheckUtil;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;

public class NotifyFetchDatumHandler
extends AbstractServerHandler<NotifyFetchDatumRequest> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotifyFetchDatumHandler.class);
    @Autowired
    private DataServerCache dataServerCache;
    @Autowired
    private DataServerConnectionFactory dataServerConnectionFactory;
    @Autowired
    private DataChangeEventCenter dataChangeEventCenter;
    @Autowired
    private DataServerConfig dataServerConfig;
    @Autowired
    private Exchange boltExchange;
    @Autowired
    private DataServerConfig dataServerBootstrapConfig;

    @Override
    public void checkParam(NotifyFetchDatumRequest request) throws RuntimeException {
        ParaCheckUtil.checkNotBlank((String)request.getIp(), (String)"ip");
    }

    @Override
    public Object doHandle(Channel channel, NotifyFetchDatumRequest request) {
        ParaCheckUtil.checkNotBlank((String)request.getIp(), (String)"ip");
        Map versionMap = request.getDataVersionMap();
        long version = request.getChangeVersion();
        String ip = request.getIp();
        if (version >= this.dataServerCache.getCurVersion()) {
            if (versionMap.isEmpty()) {
                LOGGER.info("[NotifyFetchDatumHandler] get changeVersion map is empty,change version is {},current version is {},ip is {}", new Object[]{version, this.dataServerCache.getCurVersion(), ip});
                this.dataServerCache.synced(version, ip);
            } else {
                ExecutorFactory.getCommonExecutor().execute(() -> {
                    for (Map.Entry dataCenterEntry : versionMap.entrySet()) {
                        String dataCenter = (String)dataCenterEntry.getKey();
                        Map map = (Map)dataCenterEntry.getValue();
                        for (Map.Entry dataInfoEntry : map.entrySet()) {
                            String dataInfoId = (String)dataInfoEntry.getKey();
                            Datum datum = DatumCache.get(dataCenter, dataInfoId);
                            if (datum != null) {
                                long inVersion = (Long)dataInfoEntry.getValue();
                                long currentVersion = datum.getVersion();
                                if (currentVersion > inVersion) {
                                    LOGGER.info("[NotifyFetchDatumHandler] ignore fetch because changeVersion {} is less than {},dataInfoId={},dataCenter={}", new Object[]{inVersion, currentVersion, dataInfoId, dataCenter});
                                    continue;
                                }
                                if (datum.getVersion() == ((Long)dataInfoEntry.getValue()).longValue() && !datum.getPubMap().isEmpty()) continue;
                            }
                            this.fetchDatum(ip, dataCenter, dataInfoId);
                        }
                    }
                    this.dataServerCache.synced(version, ip);
                });
            }
        } else {
            LOGGER.info("[NotifyFetchDatumHandler] ignore notify because changeVersion {} is less than {},ip is {}", new Object[]{version, this.dataServerCache.getCurVersion(), ip});
        }
        return CommonResponse.buildSuccessResponse();
    }

    private void fetchDatum(String targetIp, String dataCenter, String dataInfoId) {
        while (this.dataServerCache.getDataServers(this.dataServerConfig.getLocalDataCenter()).keySet().contains(targetIp)) {
            Connection connection = this.dataServerConnectionFactory.getConnection(targetIp);
            if (connection == null || !connection.isFine()) {
                throw new RuntimeException(String.format("connection of %s is not available", targetIp));
            }
            try {
                Server syncServer = this.boltExchange.getServer(Integer.valueOf(this.dataServerBootstrapConfig.getSyncDataPort()));
                GenericResponse response = (GenericResponse)syncServer.sendSync(syncServer.getChannel(connection.getRemoteAddress()), (Object)new GetDataRequest(dataInfoId, dataCenter), this.dataServerBootstrapConfig.getRpcTimeout());
                if (response.isSuccess()) {
                    Datum datum = (Datum)((Map)response.getData()).get(dataCenter);
                    if (datum != null) {
                        this.dataChangeEventCenter.sync(DataChangeTypeEnum.COVER, DataSourceTypeEnum.BACKUP, datum);
                        LOGGER.info("[NotifyFetchDatumHandler] fetch datum success,dataInfoId={},dataCenter={},targetIp={}", new Object[]{datum.getDataInfoId(), datum.getDataCenter(), targetIp});
                    }
                    break;
                }
                throw new RuntimeException(response.getMessage());
            }
            catch (Exception e) {
                LOGGER.error("[NotifyFetchDatumHandler] fetch datum error", (Throwable)e);
                TimeUtil.randomDelay(500);
            }
        }
    }

    public CommonResponse buildFailedResponse(String msg) {
        return CommonResponse.buildFailedResponse((String)msg);
    }

    public ChannelHandler.HandlerType getType() {
        return ChannelHandler.HandlerType.PROCESSER;
    }

    @Override
    public Class interest() {
        return NotifyFetchDatumRequest.class;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}

