/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.NestedStateMap;
import org.apache.flink.runtime.state.heap.StateMapSnapshot;

public class NestedStateMapSnapshot<K, N, S>
extends StateMapSnapshot<K, N, S, NestedStateMap<K, N, S>> {
    public NestedStateMapSnapshot(NestedStateMap<K, N, S> owningStateMap) {
        super(owningStateMap);
    }

    @Override
    public void writeState(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, TypeSerializer<S> stateSerializer, @Nonnull DataOutputView dov, @Nullable StateSnapshotTransformer<S> stateSnapshotTransformer) throws IOException {
        Map mappings = this.filterMappingsIfNeeded(((NestedStateMap)this.owningStateMap).getNamespaceMap(), stateSnapshotTransformer);
        int numberOfEntries = this.countMappingsInKeyGroup(mappings);
        dov.writeInt(numberOfEntries);
        for (Map.Entry namespaceEntry : mappings.entrySet()) {
            Object namespace = namespaceEntry.getKey();
            for (Map.Entry entry : namespaceEntry.getValue().entrySet()) {
                namespaceSerializer.serialize(namespace, dov);
                keySerializer.serialize(entry.getKey(), dov);
                stateSerializer.serialize(entry.getValue(), dov);
            }
        }
    }

    private Map<N, Map<K, S>> filterMappingsIfNeeded(Map<N, Map<K, S>> keyGroupMap, StateSnapshotTransformer<S> stateSnapshotTransformer) {
        if (stateSnapshotTransformer == null) {
            return keyGroupMap;
        }
        HashMap<Object, Map> filtered = new HashMap<Object, Map>();
        for (Map.Entry<N, Map<K, S>> namespaceEntry : keyGroupMap.entrySet()) {
            N namespace = namespaceEntry.getKey();
            Map filteredNamespaceMap = filtered.computeIfAbsent(namespace, n -> new HashMap());
            for (Map.Entry<K, S> keyEntry : namespaceEntry.getValue().entrySet()) {
                K key = keyEntry.getKey();
                S transformedvalue = stateSnapshotTransformer.filterOrTransform(keyEntry.getValue());
                if (transformedvalue == null) continue;
                filteredNamespaceMap.put(key, transformedvalue);
            }
            if (!filteredNamespaceMap.isEmpty()) continue;
            filtered.remove(namespace);
        }
        return filtered;
    }

    private int countMappingsInKeyGroup(Map<N, Map<K, S>> keyGroupMap) {
        int count = 0;
        for (Map<K, S> namespaceMap : keyGroupMap.values()) {
            count += namespaceMap.size();
        }
        return count;
    }
}

