詳解大數(shù)據處理引擎Flink內存管理
內存模型
Flink可以使用堆內和堆外內存,內存模型如圖所示:


flink使用內存劃分為堆內內存和堆外內存。按照用途可以劃分為task所用內存,network memory、managed memory、以及framework所用內存,其中task network managed所用內存計入slot內存。framework為taskmanager公用。
堆內內存包含用戶代碼所用內存、heapstatebackend、框架執(zhí)行所用內存。
堆外內存是未經jvm虛擬化的內存,直接映射到操作系統(tǒng)的內存地址,堆外內存包含框架執(zhí)行所用內存,jvm堆外內存、Direct、native等。
Direct memory內存可用于網絡傳輸緩沖。network memory屬于direct memory的范疇,flink可以借助于此進行zero copy,從而減少內核態(tài)到用戶態(tài)copy次數(shù),從而進行更高效的io操作。
jvm metaspace存放jvm加載的類的元數(shù)據,加載的類越多,需要的空間越大,overhead用于jvm的其他開銷,如native memory、code cache、thread stack等。
Managed Memory主要用于RocksDBStateBackend和批處理算子,也屬于native memory的范疇,其中rocksdbstatebackend對應rocksdb,rocksdb基于lsm數(shù)據結構實現(xiàn),每個state對應一個列族,占有獨立的writebuffer,rocksdb占用native內存大小為 blockCahe + writebufferNum * writeBuffer + index ,同時堆外內存是進程之間共享的,jvm虛擬化大量heap內存耗時較久,使用堆外內存的話可以有效的避免該環(huán)節(jié)。但堆外內存也有一定的弊端,即監(jiān)控調試使用相對復雜,對于生命周期較短的segment使用堆內內存開銷更低,flink在一些情況下,直接操作二進制數(shù)據,避免一些反序列化帶來的開銷。如果需要處理的數(shù)據超出了內存限制,則會將部分數(shù)據存儲到硬盤上。
內存管理
類似于OS中的page機制,flink模擬了操作系統(tǒng)的機制,通過page來管理內存,flink對應page的數(shù)據結構為dataview和MemorySegment,memorysegment是flink內存分配的最小單位,默認32kb,其可以在堆上也可以在堆外,flink通過MemorySegment的數(shù)據結構來訪問堆內堆外內存,借助于flink序列化機制(序列化機制會在下一小節(jié)講解),memorysegment提供了對二進制數(shù)據的讀取和寫入的方法,flink使用datainputview和dataoutputview進行memorysegment的二進制的讀取和寫入,flink可以通過HeapMemorySegment 管理堆內內存,通過HybridMemorySegment來管理堆內和堆外內存,MemorySegment管理jvm堆內存時,其定義一個字節(jié)數(shù)組的引用指向內存端,基于該內部字節(jié)數(shù)組的引用進行操作的HeapMemorySegment。
public abstract class MemorySegment {
/**
* The heap byte array object relative to which we access the memory.
* 如果為堆內存,則指向訪問的內存的引用,否則若內存為非堆內存,則為null
* <p>Is non-<tt>null</tt> if the memory is on the heap, and is <tt>null</tt>, if the memory is
* off the heap. If we have this buffer, we must never void this reference, or the memory
* segment will point to undefined addresses outside the heap and may in out-of-order execution
* cases cause segmentation faults.
*/
protected final byte[] heapMemory;
/**
* The address to the data, relative to the heap memory byte array. If the heap memory byte
* array is <tt>null</tt>, this becomes an absolute memory address outside the heap.
* 字節(jié)數(shù)組對應的相對地址
*/
protected long address;
}
HeapMemorySegment用來分配堆上內存。
public final class HeapMemorySegment extends MemorySegment {
/**
* An extra reference to the heap memory, so we can let byte array checks fail by the built-in
* checks automatically without extra checks.
* 字節(jié)數(shù)組的引用指向該內存段
*/
private byte[] memory;
public void free() {
super.free();
this.memory = null;
}
public final void get(DataOutput out, int offset, int length) throws IOException {
out.write(this.memory, offset, length);
}
}
HybridMemorySegment即支持onheap和offheap內存,flink通過jvm的unsafe操作,如果對象o不為null,為onheap的場景,并且后面的地址或者位置是相對位置,那么會直接對當前對象(比如數(shù)組)的相對位置進行操作。如果對象o為null,操作的內存塊不是JVM堆內存,為off-heap的場景,并且后面的地址是某個內存塊的絕對地址,那么這些方法的調用也相當于對該內存塊進行操作。
public final class HybridMemorySegment extends MemorySegment {
@Override
public ByteBuffer wrap(int offset, int length) {
if (address <= addressLimit) {
if (heapMemory != null) {
return ByteBuffer.wrap(heapMemory, offset, length);
}
else {
try {
ByteBuffer wrapper = offHeapBuffer.duplicate();
wrapper.limit(offset + length);
wrapper.position(offset);
return wrapper;
}
catch (IllegalArgumentException e) {
throw new IndexOutOfBoundsException();
}
}
}
else {
throw new IllegalStateException("segment has been freed");
}
}
}
flink通過MemorySegmentFactory來創(chuàng)建memorySegment,memorySegment是flink內存分配的最小單位。對于跨memorysegment的數(shù)據方位,flink抽象出一個訪問視圖,數(shù)據讀取datainputView,數(shù)據寫入dataoutputview。
/**
* This interface defines a view over some memory that can be used to sequentially read the contents of the memory.
* The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
*/
@Public
public interface DataInputView extends DataInput {
private MemorySegment[] memorySegments; // view持有的MemorySegment的引用, 該組memorysegment可以視為一個內存頁,
flink可以順序讀取memorysegmet中的數(shù)據
/**
* Reads up to {@code len} bytes of memory and stores it into {@code b} starting at offset {@code off}.
* It returns the number of read bytes or -1 if there is no more data left.
* @param b byte array to store the data to
* @param off offset into byte array
* @param len byte length to read
* @return the number of actually read bytes of -1 if there is no more data left
*/
int read(byte[] b, int off, int len) throws IOException;
}
dataoutputview是數(shù)據寫入的視圖,outputview持有多個memorysegment的引用,flink可以順序的寫入segment。
/**
* This interface defines a view over some memory that can be used to sequentially write contents to the memory.
* The view is typically backed by one or more {@link org.apache.flink.core.memory.MemorySegment}.
*/
@Public
public interface DataOutputView extends DataOutput {
private final List<MemorySegment> memory; // memorysegment的引用
/**
* Copies {@code numBytes} bytes from the source to this view.
* @param source The source to copy the bytes from.
* @param numBytes The number of bytes to copy.
void write(DataInputView source, int numBytes) throws IOException;
}
上一小節(jié)中講到的managedmemory內存部分,flink使用memorymanager來管理該內存,managedmemory只使用堆外內存,主要用于批處理中的sorting、hashing、以及caching(社區(qū)消息,未來流處理也會使用到該部分),在流計算中作為rocksdbstatebackend的部分內存。memeorymanager通過memorypool來管理memorysegment。
/**
* The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap state backends
* (e.g. RocksDB). Memory is represented either in {@link MemorySegment}s of equal size or in reserved chunks of certain
* size. Operators allocate the memory either by requesting a number of memory segments or by reserving chunks.
* Any allocated memory has to be released to be reused later.
* <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link HybridMemorySegment}).
* Releasing a memory segment will make it re-claimable by the garbage collector, but does not necessarily immediately
* releases the underlying memory.
*/
public class MemoryManager {
/**
* Allocates a set of memory segments from this memory manager.
* <p>The total allocated memory will not exceed its size limit, announced in the constructor.
* @param owner The owner to associate with the memory segment, for the fallback release.
* @param target The list into which to put the allocated memory pages.
* @param numberOfPages The number of pages to allocate.
* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
* of memory pages any more.
*/
public void allocatePages(
Object owner,
Collection<MemorySegment> target,
int numberOfPages) throws MemoryAllocationException {
}
private static void freeSegment(MemorySegment segment, @Nullable Collection<MemorySegment> segments) {
segment.free();
if (segments != null) {
segments.remove(segment);
}
}
/**
* Frees this memory segment.
* <p>After this operation has been called, no further operations are possible on the memory
* segment and will fail. The actual memory (heap or off-heap) will only be released after this
* memory segment object has become garbage collected.
*/
public void free() {
// this ensures we can place no more data and trigger
// the checks for the freed segment
address = addressLimit + 1;
}
}
對于上一小節(jié)中提到的NetWorkMemory的內存,flink使用networkbuffer做了一層buffer封裝。buffer的底層也是memorysegment,flink通過bufferpool來管理buffer,每個taskmanager都有一個netwokbufferpool,該tm上的各個task共享該networkbufferpool,同時task對應的localbufferpool所需的內存需要從networkbufferpool申請而來,它們都是flink申請的堆外內存。
上游算子向resultpartition寫入數(shù)據時,申請buffer資源,使用bufferbuilder將數(shù)據寫入memorysegment,下游算子從resultsubpartition消費數(shù)據時,利用bufferconsumer從memorysegment中讀取數(shù)據,bufferbuilder與bufferconsumer一一對應。同時這一流程也和flink的反壓機制相關。如圖

/**
* A buffer pool used to manage a number of {@link Buffer} instances from the
* {@link NetworkBufferPool}.
* <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
* free operation of the network stack by limiting the number of buffers per
* local buffer pool. It also implements the default mechanism for buffer
* recycling, which ensures that every buffer is ultimately returned to the
* network buffer pool.
* <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
* will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
* match its new size.
*/
class LocalBufferPool implements BufferPool {
@Nullable
private MemorySegment requestMemorySegment(int targetChannel) throws IOException {
MemorySegment segment = null;
synchronized (availableMemorySegments) {
returnExcessMemorySegments();
if (availableMemorySegments.isEmpty()) {
segment = requestMemorySegmentFromGlobal();
}
// segment may have been released by buffer pool owner
if (segment == null) {
segment = availableMemorySegments.poll();
}
if (segment == null) {
availabilityHelper.resetUnavailable();
}
if (segment != null && targetChannel != UNKNOWN_CHANNEL) {
if (subpartitionBuffersCount[targetChannel]++ == maxBuffersPerChannel) {
unavailableSubpartitionsCount++;
availabilityHelper.resetUnavailable();
}
}
}
return segment;
}
}
/**
* A result partition for data produced by a single task.
*
* <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
* a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
* or more {@link ResultSubpartition} instances, which further partition the data depending on the
* number of consuming tasks and the data {@link DistributionPattern}.
* <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
* happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
The life-cycle of each result partition has three (possibly overlapping) phases:
Produce Consume Release Buffer management State management
*/
public abstract class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
checkInProduceState();
return bufferPool.requestBufferBuilderBlocking(targetChannel);
}
}
}
自定義序列化框架
flink對自身支持的基本數(shù)據類型,實現(xiàn)了定制的序列化機制,flink數(shù)據集對象相對固定,可以只保存一份schema信息,從而節(jié)省存儲空間,數(shù)據序列化就是java對象和二進制數(shù)據之間的數(shù)據轉換,flink使用TypeInformation的createSerializer接口負責創(chuàng)建每種類型的序列化器,進行數(shù)據的序列化反序列化,類型信息在構建streamtransformation時通過typeextractor根據方法簽名類信息等提取類型信息并存儲在streamconfig中。
/**
* Creates a serializer for the type. The serializer may use the ExecutionConfig
* for parameterization.
* 創(chuàng)建出對應類型的序列化器
* @param config The config used to parameterize the serializer.
* @return A serializer for this type.
*/
@PublicEvolving
public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
/**
* A utility for reflection analysis on classes, to determine the return type of implementations of transformation
* functions.
*/
@Public
public class TypeExtractor {
/**
* Creates a {@link TypeInformation} from the given parameters.
* If the given {@code instance} implements {@link ResultTypeQueryable}, its information
* is used to determine the type information. Otherwise, the type information is derived
* based on the given class information.
* @param instance instance to determine type information for
* @param baseClass base class of {@code instance}
* @param clazz class of {@code instance}
* @param returnParamPos index of the return type in the type arguments of {@code clazz}
* @param <OUT> output type
* @return type information
*/
@SuppressWarnings("unchecked")
@PublicEvolving
public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz,
int returnParamPos) {
if (instance instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) instance).getProducedType();
} else {
return createTypeInfo(baseClass, clazz, returnParamPos, null, null);
}
}
}

對于嵌套的數(shù)據類型,flink從最內層的字段開始序列化,內層序列化的結果將組成外層序列化結果,反序列時,從內存中順序讀取二進制數(shù)據,根據偏移量反序列化為java對象。flink自帶序列化機制存儲密度很高,序列化對應的類型值即可。
flink中的table模塊在memorysegment的基礎上使用了BinaryRow的數(shù)據結構,可以更好地減少反序列化開銷,需要反序列化是可以只序列化相應的字段,而無需序列化整個對象。

同時你也可以注冊子類型和自定義序列化器,對于flink無法序列化的類型,會交給kryo進行處理,如果kryo也無法處理,將強制使用avro來序列化,kryo序列化性能相對flink自帶序列化機制較低,開發(fā)時可以使用env.getConfig().disableGenericTypes()來禁用kryo,盡量使用flink框架自帶的序列化器對應的數(shù)據類型。
緩存友好的數(shù)據結構
cpu中L1、L2、L3的緩存讀取速度比從內存中讀取數(shù)據快很多,高速緩存的訪問速度是主存的訪問速度的很多倍。另外一個重要的程序特性是局部性原理,程序常常使用它們最近使用的數(shù)據和指令,其中兩種局部性類型,時間局部性指最近訪問的內容很可能短期內被再次訪問,空間局部性是指地址相互臨近的項目很可能短時間內被再次訪問。

結合這兩個特性設計緩存友好的數(shù)據結構可以有效的提升緩存命中率和本地化特性,該特性主要用于排序操作中,常規(guī)情況下一個指針指向一個<key,v>對象,排序時需要根據指針pointer獲取到實際數(shù)據,然后再進行比較,這個環(huán)節(jié)涉及到內存的隨機訪問,緩存本地化會很低,使用序列化的定長key + pointer,這樣key就會連續(xù)存儲到內存中,避免的內存的隨機訪問,還可以提升cpu緩存命中率。對兩條記錄進行排序時首先比較key,如果大小不同直接返回結果,只需交換指針即可,不用交換實際數(shù)據,如果相同,則比較指針實際指向的數(shù)據。

以上就是詳解大數(shù)據處理引擎Flink內存管理的詳細內容,更多關于大數(shù)據處理引擎Flink內存管理的資料請關注腳本之家其它相關文章!
相關文章
解決mybatisplus插入報錯argument type mismatch的問題
這篇文章主要介紹了解決mybatisplus插入報錯argument type mismatch的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11
二種jar包制作方法講解(dos打包jar eclipse打包jar文件)
這篇文章主要介紹了二種jar包制作方法講解:dos打包jar和eclipse打包jar文件,大家參考使用吧2013-11-11
jasypt SaltGenerator接口定義方法源碼解讀
這篇文章主要為大家介紹了jasypt SaltGenerator接口定義方法源碼解讀,,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-09-09
SpringCloud使用Zookeeper作為配置中心的示例
這篇文章主要介紹了SpringCloud使用Zookeeper作為配置中心的示例,幫助大家更好的理解和學習使用SpringCloud,感興趣的朋友可以了解下2021-04-04
Java數(shù)字格式類(NumberFormat類和DecimalFormat類)用法詳解
NumberFormat類是Java提供的一個格式化數(shù)字的類,可以將一串數(shù)字轉化成自己想要的數(shù)據格式,也可以將字符串轉化成數(shù)值,下面這篇文章主要給大家介紹了關于Java數(shù)字格式類(NumberFormat類和DecimalFormat類)用法的相關資料,需要的朋友可以參考下2022-07-07

