/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.mirror;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.ForwardingAdmin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.mirror.ConfigPropertyFilter;
import org.apache.kafka.connect.mirror.MirrorSourceConfig;
import org.apache.kafka.connect.mirror.MirrorSourceTask;
import org.apache.kafka.connect.mirror.MirrorUtils;
import org.apache.kafka.connect.mirror.ReplicationPolicy;
import org.apache.kafka.connect.mirror.Scheduler;
import org.apache.kafka.connect.mirror.SourceAndTarget;
import org.apache.kafka.connect.mirror.TopicFilter;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MirrorSourceConnector
extends SourceConnector {
    private static final Logger log = LoggerFactory.getLogger(MirrorSourceConnector.class);
    private static final ResourcePatternFilter ANY_TOPIC = new ResourcePatternFilter(ResourceType.TOPIC, null, PatternType.ANY);
    private static final AclBindingFilter ANY_TOPIC_ACL = new AclBindingFilter(ANY_TOPIC, AccessControlEntryFilter.ANY);
    private static final String READ_COMMITTED = IsolationLevel.READ_COMMITTED.toString();
    private static final String EXACTLY_ONCE_SUPPORT_CONFIG = "exactly.once.support";
    private final AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);
    private Scheduler scheduler;
    private MirrorSourceConfig config;
    private SourceAndTarget sourceAndTarget;
    private String connectorName;
    private TopicFilter topicFilter;
    private ConfigPropertyFilter configPropertyFilter;
    private List<TopicPartition> knownSourceTopicPartitions = Collections.emptyList();
    private List<TopicPartition> knownTargetTopicPartitions = Collections.emptyList();
    private ReplicationPolicy replicationPolicy;
    private int replicationFactor;
    private Admin sourceAdminClient;
    private Admin targetAdminClient;
    private boolean heartbeatsReplicationEnabled;

    public MirrorSourceConnector() {
    }

    MirrorSourceConnector(List<TopicPartition> knownSourceTopicPartitions, MirrorSourceConfig config) {
        this.knownSourceTopicPartitions = knownSourceTopicPartitions;
        this.config = config;
    }

    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy, TopicFilter topicFilter, ConfigPropertyFilter configPropertyFilter) {
        this(sourceAndTarget, replicationPolicy, topicFilter, configPropertyFilter, true);
    }

    MirrorSourceConnector(SourceAndTarget sourceAndTarget, ReplicationPolicy replicationPolicy, TopicFilter topicFilter, ConfigPropertyFilter configPropertyFilter, boolean heartbeatsReplicationEnabled) {
        this.sourceAndTarget = sourceAndTarget;
        this.replicationPolicy = replicationPolicy;
        this.topicFilter = topicFilter;
        this.configPropertyFilter = configPropertyFilter;
        this.heartbeatsReplicationEnabled = heartbeatsReplicationEnabled;
    }

    MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient, MirrorSourceConfig config) {
        this.sourceAdminClient = sourceAdminClient;
        this.targetAdminClient = targetAdminClient;
        this.config = config;
    }

    public void start(Map<String, String> props) {
        long start = System.currentTimeMillis();
        this.config = new MirrorSourceConfig(props);
        if (!this.config.enabled()) {
            return;
        }
        this.connectorName = this.config.connectorName();
        this.sourceAndTarget = new SourceAndTarget(this.config.sourceClusterAlias(), this.config.targetClusterAlias());
        this.topicFilter = this.config.topicFilter();
        this.configPropertyFilter = this.config.configPropertyFilter();
        this.replicationPolicy = this.config.replicationPolicy();
        this.replicationFactor = this.config.replicationFactor();
        this.sourceAdminClient = this.config.forwardingAdmin(this.config.sourceAdminConfig("replication-source-admin"));
        this.targetAdminClient = this.config.forwardingAdmin(this.config.targetAdminConfig("replication-target-admin"));
        this.heartbeatsReplicationEnabled = this.config.heartbeatsReplicationEnabled();
        this.scheduler = new Scheduler(((Object)((Object)this)).getClass(), this.config.entityLabel(), this.config.adminTimeout());
        this.scheduler.execute(this::createOffsetSyncsTopic, "creating upstream offset-syncs topic");
        this.scheduler.execute(this::loadTopicPartitions, "loading initial set of topic-partitions");
        this.scheduler.execute(this::computeAndCreateTopicPartitions, "creating downstream topic-partitions");
        this.scheduler.execute(this::refreshKnownTargetTopics, "refreshing known target topics");
        this.scheduler.scheduleRepeating(this::syncTopicAcls, this.config.syncTopicAclsInterval(), "syncing topic ACLs");
        this.scheduler.scheduleRepeating(this::syncTopicConfigs, this.config.syncTopicConfigsInterval(), "syncing topic configs");
        this.scheduler.scheduleRepeatingDelayed(this::refreshTopicPartitions, this.config.refreshTopicsInterval(), "refreshing topics");
        log.info("Started {} with {} topic-partitions.", (Object)this.connectorName, (Object)this.knownSourceTopicPartitions.size());
        log.info("Starting {} took {} ms.", (Object)this.connectorName, (Object)(System.currentTimeMillis() - start));
    }

    public void stop() {
        long start = System.currentTimeMillis();
        if (!this.config.enabled()) {
            return;
        }
        Utils.closeQuietly((AutoCloseable)this.scheduler, (String)"scheduler");
        Utils.closeQuietly((AutoCloseable)this.topicFilter, (String)"topic filter");
        Utils.closeQuietly((AutoCloseable)this.configPropertyFilter, (String)"config property filter");
        Utils.closeQuietly((AutoCloseable)this.sourceAdminClient, (String)"source admin client");
        Utils.closeQuietly((AutoCloseable)this.targetAdminClient, (String)"target admin client");
        log.info("Stopping {} took {} ms.", (Object)this.connectorName, (Object)(System.currentTimeMillis() - start));
    }

    public Class<? extends Task> taskClass() {
        return MirrorSourceTask.class;
    }

    public List<Map<String, String>> taskConfigs(int maxTasks) {
        if (!this.config.enabled() || this.knownSourceTopicPartitions.isEmpty()) {
            return Collections.emptyList();
        }
        int numTasks = Math.min(maxTasks, this.knownSourceTopicPartitions.size());
        ArrayList roundRobinByTask = new ArrayList(numTasks);
        for (int i2 = 0; i2 < numTasks; ++i2) {
            roundRobinByTask.add(new ArrayList());
        }
        int count = 0;
        for (TopicPartition partition : this.knownSourceTopicPartitions) {
            int index = count % numTasks;
            ((List)roundRobinByTask.get(index)).add(partition);
            ++count;
        }
        return IntStream.range(0, numTasks).mapToObj(i -> this.config.taskConfigForTopicPartitions((List)roundRobinByTask.get(i), i)).collect(Collectors.toList());
    }

    public ConfigDef config() {
        return MirrorSourceConfig.CONNECTOR_CONFIG_DEF;
    }

    public org.apache.kafka.common.config.Config validate(Map<String, String> props) {
        List configValues = super.validate(props).configValues();
        this.validateExactlyOnceConfigs(props, configValues);
        MirrorSourceConnector.validateEmitOffsetSyncConfigs(props, configValues);
        return new org.apache.kafka.common.config.Config(configValues);
    }

    private static void validateEmitOffsetSyncConfigs(Map<String, String> props, List<ConfigValue> configValues) {
        boolean offsetSyncsConfigured = props.keySet().stream().anyMatch(conf -> conf.startsWith("offset-syncs-") || conf.startsWith("offset-syncs.topic."));
        if ("false".equals(props.get("emit.offset-syncs.enabled")) && offsetSyncsConfigured) {
            ConfigValue emitOffsetSyncs = configValues.stream().filter(prop -> "emit.offset-syncs.enabled".equals(prop.name())).findAny().orElseGet(() -> {
                ConfigValue result = new ConfigValue("emit.offset-syncs.enabled");
                configValues.add(result);
                return result;
            });
            emitOffsetSyncs.addErrorMessage("MirrorSourceConnector can't setup offset-syncs feature while emit.offset-syncs.enabled set to false");
        }
    }

    private void validateExactlyOnceConfigs(Map<String, String> props, List<ConfigValue> configValues) {
        if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG)) && !this.consumerUsesReadCommitted(props)) {
            ConfigValue exactlyOnceSupport = configValues.stream().filter(cv -> EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name())).findAny().orElseGet(() -> {
                ConfigValue result = new ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
                configValues.add(result);
                return result;
            });
            exactlyOnceSupport.addErrorMessage("MirrorSourceConnector can only provide exactly-once guarantees when its source consumer is configured with isolation.level set to '" + READ_COMMITTED + "'; otherwise, records from aborted and uncommitted transactions will be replicated from the source cluster to the target cluster.");
        }
    }

    public String version() {
        return AppInfoParser.getVersion();
    }

    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
        return this.consumerUsesReadCommitted(props) ? ExactlyOnceSupport.SUPPORTED : ExactlyOnceSupport.UNSUPPORTED;
    }

    public boolean alterOffsets(Map<String, String> connectorConfig, Map<Map<String, ?>, Map<String, ?>> offsets) {
        for (Map.Entry<Map<String, ?>, Map<String, ?>> offsetEntry : offsets.entrySet()) {
            Map<String, ?> sourceOffset = offsetEntry.getValue();
            if (sourceOffset == null) continue;
            Map<String, ?> sourcePartition = offsetEntry.getKey();
            if (sourcePartition == null) {
                throw new ConnectException("Source partitions may not be null");
            }
            MirrorUtils.validateSourcePartitionString(sourcePartition, "cluster");
            MirrorUtils.validateSourcePartitionString(sourcePartition, "topic");
            MirrorUtils.validateSourcePartitionPartition(sourcePartition);
            MirrorUtils.validateSourceOffset(sourcePartition, sourceOffset, false);
        }
        return true;
    }

    private boolean consumerUsesReadCommitted(Map<String, String> props) {
        Object consumerIsolationLevel = MirrorSourceConfig.sourceConsumerConfig(props).get("isolation.level");
        return Objects.equals(READ_COMMITTED, consumerIsolationLevel);
    }

    List<TopicPartition> findSourceTopicPartitions() throws InterruptedException, ExecutionException {
        Set<String> topics = this.listTopics(this.sourceAdminClient).stream().filter(this::shouldReplicateTopic).collect(Collectors.toSet());
        return this.describeTopics(this.sourceAdminClient, topics).stream().flatMap(MirrorSourceConnector::expandTopicDescription).collect(Collectors.toList());
    }

    List<TopicPartition> findTargetTopicPartitions() throws InterruptedException, ExecutionException {
        Set<String> topics = this.listTopics(this.targetAdminClient).stream().filter(t -> this.sourceAndTarget.source().equals(this.replicationPolicy.topicSource(t))).filter(t -> !t.equals(this.config.checkpointsTopic())).collect(Collectors.toSet());
        return this.describeTopics(this.targetAdminClient, topics).stream().flatMap(MirrorSourceConnector::expandTopicDescription).collect(Collectors.toList());
    }

    void refreshTopicPartitions() throws InterruptedException, ExecutionException {
        List<TopicPartition> sourceTopicPartitions = this.findSourceTopicPartitions();
        List<TopicPartition> targetTopicPartitions = this.findTargetTopicPartitions();
        HashSet<TopicPartition> sourceTopicPartitionsSet = new HashSet<TopicPartition>(sourceTopicPartitions);
        HashSet<TopicPartition> knownSourceTopicPartitionsSet = new HashSet<TopicPartition>(this.knownSourceTopicPartitions);
        Set upstreamTargetTopicPartitions = targetTopicPartitions.stream().map(x -> new TopicPartition(this.replicationPolicy.upstreamTopic(x.topic()), x.partition())).collect(Collectors.toSet());
        HashSet<TopicPartition> missingInTarget = new HashSet<TopicPartition>(sourceTopicPartitions);
        missingInTarget.removeAll(upstreamTargetTopicPartitions);
        this.knownTargetTopicPartitions = targetTopicPartitions;
        if (!knownSourceTopicPartitionsSet.equals(sourceTopicPartitionsSet) || !missingInTarget.isEmpty()) {
            HashSet<TopicPartition> newTopicPartitions = new HashSet<TopicPartition>(sourceTopicPartitions);
            newTopicPartitions.removeAll(knownSourceTopicPartitionsSet);
            HashSet<TopicPartition> deletedTopicPartitions = knownSourceTopicPartitionsSet;
            deletedTopicPartitions.removeAll(sourceTopicPartitionsSet);
            log.info("Found {} new topic-partitions on {}. Found {} deleted topic-partitions on {}. Found {} topic-partitions missing on {}.", new Object[]{newTopicPartitions.size(), this.sourceAndTarget.source(), deletedTopicPartitions.size(), this.sourceAndTarget.source(), missingInTarget.size(), this.sourceAndTarget.target()});
            log.trace("Found new topic-partitions on {}: {}", (Object)this.sourceAndTarget.source(), newTopicPartitions);
            log.trace("Found deleted topic-partitions on {}: {}", (Object)this.sourceAndTarget.source(), deletedTopicPartitions);
            log.trace("Found missing topic-partitions on {}: {}", (Object)this.sourceAndTarget.target(), missingInTarget);
            this.knownSourceTopicPartitions = sourceTopicPartitions;
            this.computeAndCreateTopicPartitions();
            this.context.requestTaskReconfiguration();
        }
    }

    private void loadTopicPartitions() throws InterruptedException, ExecutionException {
        this.knownSourceTopicPartitions = this.findSourceTopicPartitions();
        this.knownTargetTopicPartitions = this.findTargetTopicPartitions();
    }

    private void refreshKnownTargetTopics() throws InterruptedException, ExecutionException {
        this.knownTargetTopicPartitions = this.findTargetTopicPartitions();
    }

    private Set<String> topicsBeingReplicated() {
        Set<String> knownTargetTopics = this.toTopics(this.knownTargetTopicPartitions);
        return this.knownSourceTopicPartitions.stream().map(TopicPartition::topic).distinct().filter(x -> knownTargetTopics.contains(this.formatRemoteTopic((String)x))).collect(Collectors.toSet());
    }

    private Set<String> toTopics(Collection<TopicPartition> tps) {
        return tps.stream().map(TopicPartition::topic).collect(Collectors.toSet());
    }

    void syncTopicAcls() throws InterruptedException, ExecutionException {
        Optional<Collection<AclBinding>> rawBindings = this.listTopicAclBindings();
        if (rawBindings.isEmpty()) {
            return;
        }
        List<AclBinding> filteredBindings = rawBindings.get().stream().filter(x -> x.pattern().resourceType() == ResourceType.TOPIC).filter(x -> x.pattern().patternType() == PatternType.LITERAL).filter(this::shouldReplicateAcl).filter(x -> this.shouldReplicateTopic(x.pattern().name())).map(this::targetAclBinding).collect(Collectors.toList());
        this.updateTopicAcls(filteredBindings);
    }

    void syncTopicConfigs() throws InterruptedException, ExecutionException {
        Map<String, Config> sourceConfigs = this.describeTopicConfigs(this.topicsBeingReplicated());
        Map<String, Config> targetConfigs = sourceConfigs.entrySet().stream().collect(Collectors.toMap(x -> this.formatRemoteTopic((String)x.getKey()), x -> this.targetConfig((Config)x.getValue(), true)));
        this.incrementalAlterConfigs(targetConfigs);
    }

    private void createOffsetSyncsTopic() {
        if (this.config.emitOffsetSyncsEnabled()) {
            try (ForwardingAdmin offsetSyncsAdminClient = this.config.forwardingAdmin(this.config.offsetSyncsTopicAdminConfig());){
                MirrorUtils.createSinglePartitionCompactedTopic(this.config.offsetSyncsTopic(), this.config.offsetSyncsTopicReplicationFactor(), (Admin)offsetSyncsAdminClient);
            }
        }
    }

    void computeAndCreateTopicPartitions() throws ExecutionException, InterruptedException {
        Map<String, Long> sourceTopicToPartitionCounts = this.knownSourceTopicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        Map<String, Long> targetTopicToPartitionCounts = this.knownTargetTopicPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic, Collectors.counting())).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        Set<String> knownSourceTopics = sourceTopicToPartitionCounts.keySet();
        Set<String> knownTargetTopics = targetTopicToPartitionCounts.keySet();
        Map sourceToRemoteTopics = knownSourceTopics.stream().collect(Collectors.toMap(Function.identity(), this::formatRemoteTopic));
        Map partitionedSourceTopics = knownSourceTopics.stream().collect(Collectors.partitioningBy(sourceTopic -> knownTargetTopics.contains(sourceToRemoteTopics.get(sourceTopic)), Collectors.toSet()));
        Set existingSourceTopics = partitionedSourceTopics.get(true);
        Set<String> newSourceTopics = partitionedSourceTopics.get(false);
        if (!newSourceTopics.isEmpty()) {
            this.createNewTopics(newSourceTopics, sourceTopicToPartitionCounts);
        }
        Map sourceTopicsWithNewPartitions = existingSourceTopics.stream().filter(sourceTopic -> {
            String targetTopic = (String)sourceToRemoteTopics.get(sourceTopic);
            return (Long)sourceTopicToPartitionCounts.get(sourceTopic) > (Long)targetTopicToPartitionCounts.get(targetTopic);
        }).collect(Collectors.toMap(Function.identity(), sourceTopicToPartitionCounts::get));
        if (!sourceTopicsWithNewPartitions.isEmpty()) {
            Map<String, NewPartitions> newTargetPartitions = sourceTopicsWithNewPartitions.entrySet().stream().collect(Collectors.toMap(sourceTopicAndPartitionCount -> (String)sourceToRemoteTopics.get(sourceTopicAndPartitionCount.getKey()), sourceTopicAndPartitionCount -> NewPartitions.increaseTo((int)((Long)sourceTopicAndPartitionCount.getValue()).intValue())));
            this.createNewPartitions(newTargetPartitions);
        }
    }

    void createNewTopics(Set<String> newSourceTopics, Map<String, Long> sourceTopicToPartitionCounts) throws ExecutionException, InterruptedException {
        Map<String, Config> sourceTopicToConfig = this.describeTopicConfigs(newSourceTopics);
        Map<String, NewTopic> newTopics = newSourceTopics.stream().map(sourceTopic -> {
            String remoteTopic = this.formatRemoteTopic((String)sourceTopic);
            int partitionCount = ((Long)sourceTopicToPartitionCounts.get(sourceTopic)).intValue();
            Map<String, String> configs = MirrorSourceConnector.configToMap(this.targetConfig((Config)sourceTopicToConfig.get(sourceTopic), false));
            return new NewTopic(remoteTopic, partitionCount, (short)this.replicationFactor).configs(configs);
        }).collect(Collectors.toMap(NewTopic::name, Function.identity()));
        this.createNewTopics(newTopics);
    }

    void createNewTopics(Map<String, NewTopic> newTopics) throws ExecutionException, InterruptedException {
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.createTopics(newTopics.values(), new CreateTopicsOptions()).values().forEach((k, v) -> v.whenComplete((x, e) -> {
                if (e != null) {
                    log.warn("Could not create topic {}.", k, e);
                } else {
                    log.info("Created remote topic {} with {} partitions.", k, (Object)((NewTopic)newTopics.get(k)).numPartitions());
                }
            }));
            return null;
        }, () -> String.format("create topics %s on %s cluster", newTopics, this.config.targetClusterAlias()));
    }

    void createNewPartitions(Map<String, NewPartitions> newPartitions) throws ExecutionException, InterruptedException {
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.createPartitions(newPartitions).values().forEach((k, v) -> v.whenComplete((x, e) -> {
                if (!(e instanceof InvalidPartitionsException)) {
                    if (e != null) {
                        log.warn("Could not create topic-partitions for {}.", k, e);
                    } else {
                        log.info("Increased size of {} to {} partitions.", k, (Object)((NewPartitions)newPartitions.get(k)).totalCount());
                    }
                }
            }));
            return null;
        }, () -> String.format("create partitions %s on %s cluster", newPartitions, this.config.targetClusterAlias()));
    }

    private Set<String> listTopics(Admin adminClient) throws InterruptedException, ExecutionException {
        return MirrorUtils.adminCall(() -> (Set)adminClient.listTopics().names().get(), () -> "list topics on " + this.actualClusterAlias(adminClient) + " cluster");
    }

    private Optional<Collection<AclBinding>> listTopicAclBindings() throws InterruptedException, ExecutionException {
        return MirrorUtils.adminCall(() -> {
            Collection bindings;
            try {
                bindings = (Collection)this.sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof SecurityDisabledException) {
                    if (this.noAclAuthorizer.compareAndSet(false, true)) {
                        log.info("No ACL authorizer is configured on the source Kafka cluster, so no topic ACL syncing will take place. Consider disabling topic ACL syncing by setting sync.topic.acls.enabled to 'false'.");
                    } else {
                        log.debug("Source-side ACL authorizer still not found; skipping topic ACL sync");
                    }
                    return Optional.empty();
                }
                throw e;
            }
            return Optional.of(bindings);
        }, () -> "describe ACLs on " + this.config.sourceClusterAlias() + " cluster");
    }

    private Collection<TopicDescription> describeTopics(Admin adminClient, Collection<String> topics) throws InterruptedException, ExecutionException {
        return MirrorUtils.adminCall(() -> ((Map)adminClient.describeTopics(topics).allTopicNames().get()).values(), () -> String.format("describe topics %s on %s cluster", topics, this.actualClusterAlias(adminClient)));
    }

    static Map<String, String> configToMap(Config config) {
        return config.entries().stream().collect(Collectors.toMap(ConfigEntry::name, ConfigEntry::value));
    }

    void incrementalAlterConfigs(Map<String, Config> topicConfigs) throws ExecutionException, InterruptedException {
        HashMap configOps = new HashMap();
        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
            ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
            for (ConfigEntry config : topicConfig.getValue().entries()) {
                if (config.isDefault() && !this.shouldReplicateSourceDefault(config.name())) {
                    ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE));
                    continue;
                }
                ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET));
            }
            configOps.put(configResource, ops);
        }
        log.trace("Syncing configs for {} topics.", (Object)configOps.size());
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> {
                if (e instanceof UnsupportedVersionException) {
                    log.error("Failed to sync configs for topic {} on cluster {} with IncrementalAlterConfigs API", new Object[]{k.name(), this.sourceAndTarget.target(), e});
                    this.context.raiseError((Exception)new ConnectException("the target cluster '" + this.sourceAndTarget.target() + "' is not compatible with IncrementalAlterConfigs API", e));
                } else {
                    log.warn("Could not alter configuration of topic {}.", (Object)k.name(), e);
                }
            }));
            return null;
        }, () -> String.format("incremental alter topic configs %s on %s cluster", topicConfigs, this.config.targetClusterAlias()));
    }

    private void updateTopicAcls(List<AclBinding> bindings) throws ExecutionException, InterruptedException {
        log.trace("Syncing {} topic ACL bindings.", (Object)bindings.size());
        MirrorUtils.adminCall(() -> {
            this.targetAdminClient.createAcls((Collection)bindings).values().forEach((k, v) -> v.whenComplete((x, e) -> {
                if (e != null) {
                    log.warn("Could not sync ACL of topic {}.", (Object)k.pattern().name(), e);
                }
            }));
            return null;
        }, () -> String.format("create ACLs %s on %s cluster", bindings, this.config.targetClusterAlias()));
    }

    private static Stream<TopicPartition> expandTopicDescription(TopicDescription description) {
        String topic = description.name();
        return description.partitions().stream().map(x -> new TopicPartition(topic, x.partition()));
    }

    Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionException {
        Set resources = topics.stream().map(x -> new ConfigResource(ConfigResource.Type.TOPIC, x)).collect(Collectors.toSet());
        return MirrorUtils.adminCall(() -> ((Map)this.sourceAdminClient.describeConfigs((Collection)resources).all().get()).entrySet().stream().collect(Collectors.toMap(x -> ((ConfigResource)x.getKey()).name(), Map.Entry::getValue)), () -> String.format("describe configs for topics %s on %s cluster", topics, this.config.sourceClusterAlias()));
    }

    Config targetConfig(Config sourceConfig, boolean incremental) {
        List entries = sourceConfig.entries().stream().filter(x -> incremental || x.isDefault() && this.shouldReplicateSourceDefault(x.name()) || !x.isDefault()).filter(x -> !x.isReadOnly() && !x.isSensitive()).filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG).filter(x -> this.shouldReplicateTopicConfigurationProperty(x.name())).collect(Collectors.toList());
        return new Config(entries);
    }

    private static AccessControlEntry downgradeAllowAllACL(AccessControlEntry entry) {
        return new AccessControlEntry(entry.principal(), entry.host(), AclOperation.READ, entry.permissionType());
    }

    AclBinding targetAclBinding(AclBinding sourceAclBinding) {
        String targetTopic = this.formatRemoteTopic(sourceAclBinding.pattern().name());
        AccessControlEntry entry = sourceAclBinding.entry().permissionType() == AclPermissionType.ALLOW && sourceAclBinding.entry().operation() == AclOperation.ALL ? MirrorSourceConnector.downgradeAllowAllACL(sourceAclBinding.entry()) : sourceAclBinding.entry();
        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, targetTopic, PatternType.LITERAL), entry);
    }

    boolean shouldReplicateTopic(String topic) {
        return (this.topicFilter.shouldReplicateTopic(topic) || this.heartbeatsReplicationEnabled && this.replicationPolicy.isHeartbeatsTopic(topic)) && !this.replicationPolicy.isInternalTopic(topic) && !this.isCycle(topic);
    }

    boolean shouldReplicateAcl(AclBinding aclBinding) {
        return aclBinding.entry().permissionType() != AclPermissionType.ALLOW || aclBinding.entry().operation() != AclOperation.WRITE;
    }

    boolean shouldReplicateTopicConfigurationProperty(String property) {
        return this.configPropertyFilter.shouldReplicateConfigProperty(property);
    }

    boolean shouldReplicateSourceDefault(String property) {
        return this.configPropertyFilter.shouldReplicateSourceDefault(property);
    }

    boolean isCycle(String topic) {
        String source = this.replicationPolicy.topicSource(topic);
        if (source == null) {
            return false;
        }
        if (source.equals(this.sourceAndTarget.target())) {
            return true;
        }
        String upstreamTopic = this.replicationPolicy.upstreamTopic(topic);
        if (upstreamTopic == null || upstreamTopic.equals(topic)) {
            return false;
        }
        return this.isCycle(upstreamTopic);
    }

    String formatRemoteTopic(String topic) {
        return this.replicationPolicy.formatRemoteTopic(this.sourceAndTarget.source(), topic);
    }

    private String actualClusterAlias(Admin adminClient) {
        return adminClient.equals((Object)this.sourceAdminClient) ? this.config.sourceClusterAlias() : this.config.targetClusterAlias();
    }
}

