欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解大數(shù)據(jù)處理引擎Flink內(nèi)存管理

 更新時(shí)間:2021年05月20日 09:08:17   作者:華為云開(kāi)發(fā)者社區(qū)  
Flink是jvm之上的大數(shù)據(jù)處理引擎,jvm存在java對(duì)象存儲(chǔ)密度低、full gc時(shí)消耗性能,gc存在stw的問(wèn)題,同時(shí)omm時(shí)會(huì)影響穩(wěn)定性。針對(duì)頻繁序列化和反序列化問(wèn)題flink使用堆內(nèi)堆外內(nèi)存可以直接在一些場(chǎng)景下操作二進(jìn)制數(shù)據(jù),減少序列化反序列化消耗。本文帶你詳細(xì)理解其原理。

內(nèi)存模型

Flink可以使用堆內(nèi)和堆外內(nèi)存,內(nèi)存模型如圖所示:

flink使用內(nèi)存劃分為堆內(nèi)內(nèi)存和堆外內(nèi)存。按照用途可以劃分為task所用內(nèi)存,network memory、managed memory、以及framework所用內(nèi)存,其中task network managed所用內(nèi)存計(jì)入slot內(nèi)存。framework為taskmanager公用。

堆內(nèi)內(nèi)存包含用戶代碼所用內(nèi)存、heapstatebackend、框架執(zhí)行所用內(nèi)存。

堆外內(nèi)存是未經(jīng)jvm虛擬化的內(nèi)存,直接映射到操作系統(tǒng)的內(nèi)存地址,堆外內(nèi)存包含框架執(zhí)行所用內(nèi)存,jvm堆外內(nèi)存、Direct、native等。

Direct memory內(nèi)存可用于網(wǎng)絡(luò)傳輸緩沖。network memory屬于direct memory的范疇,flink可以借助于此進(jìn)行zero copy,從而減少內(nèi)核態(tài)到用戶態(tài)copy次數(shù),從而進(jìn)行更高效的io操作。

jvm metaspace存放jvm加載的類的元數(shù)據(jù),加載的類越多,需要的空間越大,overhead用于jvm的其他開(kāi)銷,如native memory、code cache、thread stack等。

Managed Memory主要用于RocksDBStateBackend和批處理算子,也屬于native memory的范疇,其中rocksdbstatebackend對(duì)應(yīng)rocksdb,rocksdb基于lsm數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn),每個(gè)state對(duì)應(yīng)一個(gè)列族,占有獨(dú)立的writebuffer,rocksdb占用native內(nèi)存大小為 blockCahe + writebufferNum * writeBuffer + index ,同時(shí)堆外內(nèi)存是進(jìn)程之間共享的,jvm虛擬化大量heap內(nèi)存耗時(shí)較久,使用堆外內(nèi)存的話可以有效的避免該環(huán)節(jié)。但堆外內(nèi)存也有一定的弊端,即監(jiān)控調(diào)試使用相對(duì)復(fù)雜,對(duì)于生命周期較短的segment使用堆內(nèi)內(nèi)存開(kāi)銷更低,flink在一些情況下,直接操作二進(jìn)制數(shù)據(jù),避免一些反序列化帶來(lái)的開(kāi)銷。如果需要處理的數(shù)據(jù)超出了內(nèi)存限制,則會(huì)將部分?jǐn)?shù)據(jù)存儲(chǔ)到硬盤上。

內(nèi)存管理

類似于OS中的page機(jī)制,flink模擬了操作系統(tǒng)的機(jī)制,通過(guò)page來(lái)管理內(nèi)存,flink對(duì)應(yīng)page的數(shù)據(jù)結(jié)構(gòu)為dataview和MemorySegment,memorysegment是flink內(nèi)存分配的最小單位,默認(rèn)32kb,其可以在堆上也可以在堆外,flink通過(guò)MemorySegment的數(shù)據(jù)結(jié)構(gòu)來(lái)訪問(wèn)堆內(nèi)堆外內(nèi)存,借助于flink序列化機(jī)制(序列化機(jī)制會(huì)在下一小節(jié)講解),memorysegment提供了對(duì)二進(jìn)制數(shù)據(jù)的讀取和寫入的方法,flink使用datainputview和dataoutputview進(jìn)行memorysegment的二進(jìn)制的讀取和寫入,flink可以通過(guò)HeapMemorySegment 管理堆內(nèi)內(nèi)存,通過(guò)HybridMemorySegment來(lái)管理堆內(nèi)和堆外內(nèi)存,MemorySegment管理jvm堆內(nèi)存時(shí),其定義一個(gè)字節(jié)數(shù)組的引用指向內(nèi)存端,基于該內(nèi)部字節(jié)數(shù)組的引用進(jìn)行操作的HeapMemorySegment。

public abstract class MemorySegment {
    /**
     * The heap byte array object relative to which we access the memory.
     *  如果為堆內(nèi)存,則指向訪問(wèn)的內(nèi)存的引用,否則若內(nèi)存為非堆內(nèi)存,則為null
     * <p>Is non-<tt>null</tt> if the memory is on the heap, and is <tt>null</tt>, if the memory is
     * off the heap. If we have this buffer, we must never void this reference, or the memory
     * segment will point to undefined addresses outside the heap and may in out-of-order execution
     * cases cause segmentation faults.
     */
    protected final byte[] heapMemory;
    /**
     * The address to the data, relative to the heap memory byte array. If the heap memory byte
     * array is <tt>null</tt>, this becomes an absolute memory address outside the heap.
     * 字節(jié)數(shù)組對(duì)應(yīng)的相對(duì)地址
     */
    protected long address;  
}

HeapMemorySegment用來(lái)分配堆上內(nèi)存。

public final class HeapMemorySegment extends MemorySegment {
    /**
     * An extra reference to the heap memory, so we can let byte array checks fail by the built-in
     * checks automatically without extra checks.
     * 字節(jié)數(shù)組的引用指向該內(nèi)存段
     */
    private byte[] memory;
    public void free() {
        super.free();
        this.memory = null;
    }
 
    public final void get(DataOutput out, int offset, int length) throws IOException {
        out.write(this.memory, offset, length);
    }
}

HybridMemorySegment即支持onheap和offheap內(nèi)存,flink通過(guò)jvm的unsafe操作,如果對(duì)象o不為null,為onheap的場(chǎng)景,并且后面的地址或者位置是相對(duì)位置,那么會(huì)直接對(duì)當(dāng)前對(duì)象(比如數(shù)組)的相對(duì)位置進(jìn)行操作。如果對(duì)象o為null,操作的內(nèi)存塊不是JVM堆內(nèi)存,為off-heap的場(chǎng)景,并且后面的地址是某個(gè)內(nèi)存塊的絕對(duì)地址,那么這些方法的調(diào)用也相當(dāng)于對(duì)該內(nèi)存塊進(jìn)行操作。

public final class HybridMemorySegment extends MemorySegment {
  @Override
    public ByteBuffer wrap(int offset, int length) {
        if (address <= addressLimit) {
            if (heapMemory != null) {
                return ByteBuffer.wrap(heapMemory, offset, length);
            }
            else {
                try {
                    ByteBuffer wrapper = offHeapBuffer.duplicate();
                    wrapper.limit(offset + length);
                    wrapper.position(offset);
                    return wrapper;
                }
                catch (IllegalArgumentException e) {
                    throw new IndexOutOfBoundsException();
                }
            }
        }
        else {
            throw new IllegalStateException("segment has been freed");
        }
    }
}

flink通過(guò)MemorySegmentFactory來(lái)創(chuàng)建memorySegment,memorySegment是flink內(nèi)存分配的最小單位。對(duì)于跨memorysegment的數(shù)據(jù)方位,flink抽象出一個(gè)訪問(wèn)視圖,數(shù)據(jù)讀取datainputView,數(shù)據(jù)寫入dataoutputview。

/**
 * This interface defines a view over some memory that can be used to sequentially read the contents of the memory.
 * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
 */
@Public
public interface DataInputView extends DataInput {
private MemorySegment[] memorySegments; // view持有的MemorySegment的引用, 該組memorysegment可以視為一個(gè)內(nèi)存頁(yè),
flink可以順序讀取memorysegmet中的數(shù)據(jù)
/**
     * Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.
     * It returns the number of read bytes or -1 if there is no more data left.
     * @param b byte array to store the data to
     * @param off offset into byte array
     * @param len byte length to read
     * @return the number of actually read bytes of -1 if there is no more data left
     */
    int read(byte[] b, int off, int len) throws IOException;
}

dataoutputview是數(shù)據(jù)寫入的視圖,outputview持有多個(gè)memorysegment的引用,flink可以順序的寫入segment。

/**
 * This interface defines a view over some memory that can be used to sequentially write contents to the memory.
 * The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
 */
@Public
public interface DataOutputView extends DataOutput {
private final List<MemorySegment> memory; // memorysegment的引用
/**
     * Copies {@code numBytes} bytes from the source to this view.
     * @param source The source to copy the bytes from.
     * @param numBytes The number of bytes to copy.
    void write(DataInputView source, int numBytes) throws IOException;
}

上一小節(jié)中講到的managedmemory內(nèi)存部分,flink使用memorymanager來(lái)管理該內(nèi)存,managedmemory只使用堆外內(nèi)存,主要用于批處理中的sorting、hashing、以及caching(社區(qū)消息,未來(lái)流處理也會(huì)使用到該部分),在流計(jì)算中作為rocksdbstatebackend的部分內(nèi)存。memeorymanager通過(guò)memorypool來(lái)管理memorysegment。

/**
 * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends
 * (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain
 * size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
 * Any allocated memory has to be released to be reused later.
 * <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}).
 * Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately
 * releases the underlying memory.
 */
public class MemoryManager {
 /**
     * Allocates a set of memory segments from this memory manager.
     * <p>The total allocated memory will not exceed its size limit, announced in the constructor.
     * @param owner The owner to associate with the memory segment, for the fallback release.
     * @param target The list into which to put the allocated memory pages.
     * @param numberOfPages The number of pages to allocate.
     * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
     *                                   of memory pages any more.
     */
    public void allocatePages(
            Object owner,
            Collection<MemorySegment> target,
            int numberOfPages) throws MemoryAllocationException {
}

private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {
        segment.free();
        if (segments != null) {
            segments.remove(segment);
        }
    }
/**
     * Frees this memory segment.
     * <p>After this operation has been called, no further operations are possible on the memory
     * segment and will fail. The actual memory (heap or off-heap) will only be released after this
     * memory segment object has become garbage collected.
     */
    public void free() {
        // this ensures we can place no more data and trigger
        // the checks for the freed segment
        address = addressLimit + 1;
    }
}

對(duì)于上一小節(jié)中提到的NetWorkMemory的內(nèi)存,flink使用networkbuffer做了一層buffer封裝。buffer的底層也是memorysegment,flink通過(guò)bufferpool來(lái)管理buffer,每個(gè)taskmanager都有一個(gè)netwokbufferpool,該tm上的各個(gè)task共享該networkbufferpool,同時(shí)task對(duì)應(yīng)的localbufferpool所需的內(nèi)存需要從networkbufferpool申請(qǐng)而來(lái),它們都是flink申請(qǐng)的堆外內(nèi)存。

上游算子向resultpartition寫入數(shù)據(jù)時(shí),申請(qǐng)buffer資源,使用bufferbuilder將數(shù)據(jù)寫入memorysegment,下游算子從resultsubpartition消費(fèi)數(shù)據(jù)時(shí),利用bufferconsumer從memorysegment中讀取數(shù)據(jù),bufferbuilder與bufferconsumer一一對(duì)應(yīng)。同時(shí)這一流程也和flink的反壓機(jī)制相關(guān)。如圖

/**
 * A buffer pool used to manage a number of {@link Buffer} instances from the
 * {@link NetworkBufferPool}.
 * <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
 * free operation of the network stack by limiting the number of buffers per
 * local buffer pool. It also implements the default mechanism for buffer
 * recycling, which ensures that every buffer is ultimately returned to the
 * network buffer pool.
 * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
 * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
 * match its new size.
 */
class LocalBufferPool implements BufferPool {
@Nullable
    private MemorySegment requestMemorySegment(int targetChannel) throws IOException {
        MemorySegment segment = null;
        synchronized (availableMemorySegments) {
            returnExcessMemorySegments();

            if (availableMemorySegments.isEmpty()) {
                segment = requestMemorySegmentFromGlobal();
            }
            // segment may have been released by buffer pool owner
            if (segment == null) {
                segment = availableMemorySegments.poll();
            }
            if (segment == null) {
                availabilityHelper.resetUnavailable();
            }
            if (segment != null && targetChannel != UNKNOWN_CHANNEL) {
                if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {
                    unavailableSubpartitionsCount++;
                    availabilityHelper.resetUnavailable();
                }
            }
        }
        return segment;
    }
    }
    /**
 * A result partition for data produced by a single task.
 *
 * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
 * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
 * or more {@link ResultSubpartition} instances, which further partition the data depending on the
 * number of consuming tasks and the data {@link DistributionPattern}.
 * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
 * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
  The life-cycle of each result partition has three (possibly overlapping) phases:
    Produce  Consume  Release  Buffer management State management
 */
public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
      @Override
    public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
        checkInProduceState();
        return bufferPool.requestBufferBuilderBlocking(targetChannel);
    }
    }
}

自定義序列化框架

flink對(duì)自身支持的基本數(shù)據(jù)類型,實(shí)現(xiàn)了定制的序列化機(jī)制,flink數(shù)據(jù)集對(duì)象相對(duì)固定,可以只保存一份schema信息,從而節(jié)省存儲(chǔ)空間,數(shù)據(jù)序列化就是java對(duì)象和二進(jìn)制數(shù)據(jù)之間的數(shù)據(jù)轉(zhuǎn)換,flink使用TypeInformation的createSerializer接口負(fù)責(zé)創(chuàng)建每種類型的序列化器,進(jìn)行數(shù)據(jù)的序列化反序列化,類型信息在構(gòu)建streamtransformation時(shí)通過(guò)typeextractor根據(jù)方法簽名類信息等提取類型信息并存儲(chǔ)在streamconfig中。

/**
     * Creates a serializer for the type. The serializer may use the ExecutionConfig
     * for parameterization.
     * 創(chuàng)建出對(duì)應(yīng)類型的序列化器
     * @param config The config used to parameterize the serializer.
     * @return A serializer for this type.
     */
    @PublicEvolving
    public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
/**
 * A utility for reflection analysis on classes, to determine the return type of implementations of transformation
 * functions.
 */
@Public
public class TypeExtractor {
/**
 * Creates a {@link TypeInformation} from the given parameters.
     * If the given {@code instance} implements {@link ResultTypeQueryable}, its information
     * is used to determine the type information. Otherwise, the type information is derived
     * based on the given class information.
     * @param instance            instance to determine type information for
     * @param baseClass            base class of {@code instance}
     * @param clazz                class of {@code instance}
     * @param returnParamPos    index of the return type in the type arguments of {@code clazz}
     * @param <OUT>                output type
     * @return type information
     */
    @SuppressWarnings("unchecked")
    @PublicEvolving
    public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz,
 int returnParamPos) {
        if (instance instanceof ResultTypeQueryable) {
            return ((ResultTypeQueryable<OUT>) instance).getProducedType();
        } else {
            return createTypeInfo(baseClass, clazz, returnParamPos, null, null);
        }
    }
}

對(duì)于嵌套的數(shù)據(jù)類型,flink從最內(nèi)層的字段開(kāi)始序列化,內(nèi)層序列化的結(jié)果將組成外層序列化結(jié)果,反序列時(shí),從內(nèi)存中順序讀取二進(jìn)制數(shù)據(jù),根據(jù)偏移量反序列化為java對(duì)象。flink自帶序列化機(jī)制存儲(chǔ)密度很高,序列化對(duì)應(yīng)的類型值即可。

flink中的table模塊在memorysegment的基礎(chǔ)上使用了BinaryRow的數(shù)據(jù)結(jié)構(gòu),可以更好地減少反序列化開(kāi)銷,需要反序列化是可以只序列化相應(yīng)的字段,而無(wú)需序列化整個(gè)對(duì)象。

同時(shí)你也可以注冊(cè)子類型和自定義序列化器,對(duì)于flink無(wú)法序列化的類型,會(huì)交給kryo進(jìn)行處理,如果kryo也無(wú)法處理,將強(qiáng)制使用avro來(lái)序列化,kryo序列化性能相對(duì)flink自帶序列化機(jī)制較低,開(kāi)發(fā)時(shí)可以使用env.getConfig().disableGenericTypes()來(lái)禁用kryo,盡量使用flink框架自帶的序列化器對(duì)應(yīng)的數(shù)據(jù)類型。

緩存友好的數(shù)據(jù)結(jié)構(gòu)

cpu中L1、L2、L3的緩存讀取速度比從內(nèi)存中讀取數(shù)據(jù)快很多,高速緩存的訪問(wèn)速度是主存的訪問(wèn)速度的很多倍。另外一個(gè)重要的程序特性是局部性原理,程序常常使用它們最近使用的數(shù)據(jù)和指令,其中兩種局部性類型,時(shí)間局部性指最近訪問(wèn)的內(nèi)容很可能短期內(nèi)被再次訪問(wèn),空間局部性是指地址相互臨近的項(xiàng)目很可能短時(shí)間內(nèi)被再次訪問(wèn)。

結(jié)合這兩個(gè)特性設(shè)計(jì)緩存友好的數(shù)據(jù)結(jié)構(gòu)可以有效的提升緩存命中率和本地化特性,該特性主要用于排序操作中,常規(guī)情況下一個(gè)指針指向一個(gè)<key,v>對(duì)象,排序時(shí)需要根據(jù)指針pointer獲取到實(shí)際數(shù)據(jù),然后再進(jìn)行比較,這個(gè)環(huán)節(jié)涉及到內(nèi)存的隨機(jī)訪問(wèn),緩存本地化會(huì)很低,使用序列化的定長(zhǎng)key + pointer,這樣key就會(huì)連續(xù)存儲(chǔ)到內(nèi)存中,避免的內(nèi)存的隨機(jī)訪問(wèn),還可以提升cpu緩存命中率。對(duì)兩條記錄進(jìn)行排序時(shí)首先比較key,如果大小不同直接返回結(jié)果,只需交換指針即可,不用交換實(shí)際數(shù)據(jù),如果相同,則比較指針實(shí)際指向的數(shù)據(jù)。

以上就是詳解大數(shù)據(jù)處理引擎Flink內(nèi)存管理的詳細(xì)內(nèi)容,更多關(guān)于大數(shù)據(jù)處理引擎Flink內(nèi)存管理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 解決mybatisplus插入報(bào)錯(cuò)argument type mismatch的問(wèn)題

    解決mybatisplus插入報(bào)錯(cuò)argument type mismatch的問(wèn)題

    這篇文章主要介紹了解決mybatisplus插入報(bào)錯(cuò)argument type mismatch的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-11-11
  • 2020年編程選Java的8大理由,JAVA前景如何

    2020年編程選Java的8大理由,JAVA前景如何

    這篇文章主要介紹了2020年編程選Java的8大理由,JAVA前景如何,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-07-07
  • 二種jar包制作方法講解(dos打包jar eclipse打包jar文件)

    二種jar包制作方法講解(dos打包jar eclipse打包jar文件)

    這篇文章主要介紹了二種jar包制作方法講解:dos打包jar和eclipse打包jar文件,大家參考使用吧
    2013-11-11
  • springboot之如何同時(shí)連接多個(gè)redis

    springboot之如何同時(shí)連接多個(gè)redis

    這篇文章主要介紹了springboot之如何同時(shí)連接多個(gè)redis問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • jasypt SaltGenerator接口定義方法源碼解讀

    jasypt SaltGenerator接口定義方法源碼解讀

    這篇文章主要為大家介紹了jasypt SaltGenerator接口定義方法源碼解讀,,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-09-09
  • 非常實(shí)用的java萬(wàn)年歷制作方法

    非常實(shí)用的java萬(wàn)年歷制作方法

    這篇文章主要為大家詳細(xì)介紹了非常實(shí)用的java萬(wàn)年歷制作方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-02-02
  • SpringCloud使用Zookeeper作為配置中心的示例

    SpringCloud使用Zookeeper作為配置中心的示例

    這篇文章主要介紹了SpringCloud使用Zookeeper作為配置中心的示例,幫助大家更好的理解和學(xué)習(xí)使用SpringCloud,感興趣的朋友可以了解下
    2021-04-04
  • Java基于Session登錄驗(yàn)證的實(shí)現(xiàn)示例

    Java基于Session登錄驗(yàn)證的實(shí)現(xiàn)示例

    基于Session的登錄驗(yàn)證方式是最簡(jiǎn)單的一種登錄校驗(yàn)方式,本文主要介紹了Java基于Session登錄驗(yàn)證的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-02-02
  • Java數(shù)字格式類(NumberFormat類和DecimalFormat類)用法詳解

    Java數(shù)字格式類(NumberFormat類和DecimalFormat類)用法詳解

    NumberFormat類是Java提供的一個(gè)格式化數(shù)字的類,可以將一串?dāng)?shù)字轉(zhuǎn)化成自己想要的數(shù)據(jù)格式,也可以將字符串轉(zhuǎn)化成數(shù)值,下面這篇文章主要給大家介紹了關(guān)于Java數(shù)字格式類(NumberFormat類和DecimalFormat類)用法的相關(guān)資料,需要的朋友可以參考下
    2022-07-07
  • 為什么Java項(xiàng)目中別用!=null做判空

    為什么Java項(xiàng)目中別用!=null做判空

    本文主要介紹了為什么Java項(xiàng)目中別用!=null做判空,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04

最新評(píng)論