国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

簡單介紹ES中的索引存儲(chǔ)類型

這篇具有很好參考價(jià)值的文章主要介紹了簡單介紹ES中的索引存儲(chǔ)類型。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

老鐵們好,我是V,今天我們簡單聊聊ES中的索引存儲(chǔ)類型

支持的存儲(chǔ)類型

目前ES中主要支持以下幾種存儲(chǔ)類型

fs

默認(rèn)文件系統(tǒng)實(shí)現(xiàn)。這將根據(jù)操作環(huán)境選擇最佳實(shí)施,目前會(huì)默認(rèn)啟用hybridfs

simplefs

Simple FS 類型是SimpleFsDirectory使用隨機(jī)訪問文件的文件系統(tǒng)存儲(chǔ)(映射到 Lucene)的直接實(shí)現(xiàn)。這種實(shí)現(xiàn)的并發(fā)性能很差(多個(gè)線程會(huì)成為瓶頸),并且禁用了堆內(nèi)存使用的一些優(yōu)化?;旧鲜褂玫妮^少

niofs

NIO FS 類型使用 NIO 將分片索引存儲(chǔ)在文件系統(tǒng)上(映射到 Lucene NIOFSDirectory)。它允許多個(gè)線程同時(shí)讀取同一個(gè)文件。但是不建議在 Windows 上使用,因?yàn)樵趙indow環(huán)境下Java 實(shí)現(xiàn)中存在一些錯(cuò)誤,并且禁用了堆內(nèi)存使用的某些優(yōu)化。

mmapfs

MMap FS 類型通過將文件映射到內(nèi)存 ( MMapDirectorymmap) 將分片索引存儲(chǔ)在文件系統(tǒng)上(映射到 Lucene)。內(nèi)存映射占用進(jìn)程中虛擬內(nèi)存地址空間的一部分,其大小等于被映射文件的大小。在使用此類之前,請(qǐng)確保您已允許了足夠的 虛擬地址空間。

hybridfs

該類型是niofs和mmaps的混合體,它根據(jù)讀取訪問模式為每種類型的文件選擇最佳的文件系統(tǒng)類型。目前只有 Lucene 術(shù)語詞典、規(guī)范和文檔值文件是內(nèi)存映射的。所有其他文件均使用 NIOFSDirectory 打開。和mmapfs類似,要確保你已允許了足夠的 虛擬地址空間。

如何修改存儲(chǔ)類型

修改存儲(chǔ)類型首先需要關(guān)閉索引,然后修改存儲(chǔ)類型后再打開索引

POST /xxx/_close

PUT /xxx/_settings
{
  "index.store.type": "niofs"
}

POST /xxx/_open

存儲(chǔ)類型源碼解析

決定索引使用哪種存儲(chǔ)類型的代碼如下:

org.elasticsearch.index.store.FsDirectoryFactory#newFSDirectory

protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
    final String storeType =
            indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey());
    IndexModule.Type type;
    if (IndexModule.Type.FS.match(storeType)) {
        type = IndexModule.defaultStoreType(IndexModule.NODE_STORE_ALLOW_MMAP.get(indexSettings.getNodeSettings()));
    } else {
        type = IndexModule.Type.fromSettingsKey(storeType);
    }
    Set<String> preLoadExtensions = new HashSet<>(
        indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
    switch (type) {
        case HYBRIDFS:
            // Use Lucene defaults
            final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory);
            if (primaryDirectory instanceof MMapDirectory) {
                MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory;
                return new HybridDirectory(lockFactory, setPreload(mMapDirectory, lockFactory, preLoadExtensions));
            } else {
                return primaryDirectory;
            }
        case MMAPFS:
            return setPreload(new MMapDirectory(location, lockFactory), lockFactory, preLoadExtensions);
        case SIMPLEFS:
            return new SimpleFSDirectory(location, lockFactory);
        case NIOFS:
            return new NIOFSDirectory(location, lockFactory);
        default:
            throw new AssertionError("unexpected built-in store type [" + type + "]");
    }
}

當(dāng)index.store.type為空、或者fs或者h(yuǎn)ybridfs在linux環(huán)境下都會(huì)選擇hybridfs,即混合使用

niofs和mmapfs

所以上面看似有很多種類型,其實(shí)fs和hybridfs都是基于niofs和mmapfs來實(shí)現(xiàn)的,而simplefs基本上因?yàn)樾阅軉栴}沒有人使用,所以我們著重介紹下niofs和mmapfs實(shí)現(xiàn)。

niofs

niofs的實(shí)現(xiàn)是org.apache.lucene.store.NIOFSDirectory

niofs如何獲取ByteBuffer

niofs對(duì)外提供的ByteBuffer是HeapByteBuffer

流程圖

es的存儲(chǔ)類型,elasticsearch,es,nio,linux

存儲(chǔ)類型打開后返回一個(gè)IndexInput供其他模塊讀取數(shù)據(jù)

# org.apache.lucene.store.NIOFSDirectory#openInput
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
  ensureOpen();
  ensureCanRead(name);
  Path path = getDirectory().resolve(name);
  FileChannel fc = FileChannel.open(path, StandardOpenOption.READ);
  boolean success = false;
  try {
    final NIOFSIndexInput indexInput = new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context);
    success = true;
    return indexInput;
  } finally {
    if (success == false) {
      IOUtils.closeWhileHandlingException(fc);
    }
  }
}

IndexInput的實(shí)現(xiàn)是NIOFSIndexInput

其中主要包含一個(gè)FileChannel、是否克隆、開始坐標(biāo)和結(jié)束坐標(biāo)

static final class NIOFSIndexInput extends BufferedIndexInput {
      /**
       * The maximum chunk size for reads of 16384 bytes.
       */
      private static final int CHUNK_SIZE = 16384;
      
      /** the file channel we will read from */
      protected final FileChannel channel;
      /** is this instance a clone and hence does not own the file to close it */
      boolean isClone = false;
      /** start offset: non-zero in the slice case */
      protected final long off;
      /** end offset (start+length) */
      protected final long end;
 }

父類BufferedIndexInput中的buffer才是真正對(duì)外提供數(shù)據(jù)的對(duì)象

public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput {

  private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.allocate(0);

  /** Default buffer size set to {@value #BUFFER_SIZE}. */
  public static final int BUFFER_SIZE = 1024;
  
  /** Minimum buffer size allowed */
  public static final int MIN_BUFFER_SIZE = 8;
  
  // The normal read buffer size defaults to 1024, but
  // increasing this during merging seems to yield
  // performance gains.  However we don't want to increase
  // it too much because there are quite a few
  // BufferedIndexInputs created during merging.  See
  // LUCENE-888 for details.
  /**
   * A buffer size for merges set to {@value #MERGE_BUFFER_SIZE}.
   */
  public static final int MERGE_BUFFER_SIZE = 4096;

  private int bufferSize = BUFFER_SIZE;
  
  private ByteBuffer buffer = EMPTY_BYTEBUFFER;

  private long bufferStart = 0;       // position in file of buffer
  
  ...
 }

其他模塊會(huì)調(diào)用NIOFSIndexInput的父類BufferedIndexInput的readByte readLong readInt等方法獲取數(shù)據(jù),而這些方法又是調(diào)用內(nèi)部的buffer來獲取數(shù)據(jù)

@Override
public final short readShort() throws IOException {
  if (Short.BYTES <= buffer.remaining()) {
    return buffer.getShort();
  } else {
    return super.readShort();
  }
}

@Override
public final int readInt() throws IOException {
  if (Integer.BYTES <= buffer.remaining()) {
    return buffer.getInt();
  } else {
    return super.readInt();
  }
}

@Override
public final long readLong() throws IOException {
  if (Long.BYTES <= buffer.remaining()) {
    return buffer.getLong();
  } else {
    return super.readLong();
  }
}

這些方法都會(huì)嘗試先從buffer中獲取數(shù)據(jù),如果buffer中沒有則調(diào)用父類的的方法

例如readInt會(huì)嘗試先從buffer中獲取數(shù)據(jù),如果buffer中沒有則調(diào)用父類的readInt方法

public int readInt() throws IOException {
  return ((readByte() & 0xFF) << 24) | ((readByte() & 0xFF) << 16)
       | ((readByte() & 0xFF) <<  8) |  (readByte() & 0xFF);
}

而父類的readInt又會(huì)調(diào)用readByte,最終buffer中如果沒有數(shù)據(jù)則會(huì)出發(fā)refill邏輯

@Override
public final byte readByte() throws IOException {
  if (buffer.hasRemaining() == false) {
    refill();
  }
  return buffer.get();
}
BufferedIndexInput#refill

buffer中的數(shù)據(jù)是調(diào)用refill方法首先創(chuàng)建一個(gè)HeapByteBuffer,然后調(diào)用readInternal來填充buffer

# org.apache.lucene.store.BufferedIndexInput#refill
private void refill() throws IOException {
  long start = bufferStart + buffer.position();
  long end = start + bufferSize; // bufferSize是1024
  if (end > length())  // don't read past EOF
    end = length();
  int newLength = (int)(end - start);
  if (newLength <= 0)
    throw new EOFException("read past EOF: " + this);

  if (buffer == EMPTY_BYTEBUFFER) {
    // 創(chuàng)建一個(gè)HeapByteBuffer
    buffer = ByteBuffer.allocate(bufferSize);  // allocate buffer lazily
    // 檢查當(dāng)前坐標(biāo)是否越界
    seekInternal(bufferStart);
  }
  buffer.position(0);
  buffer.limit(newLength);
  bufferStart = start;
  // 填充當(dāng)前的buffer
  readInternal(buffer);
  // Make sure sub classes don't mess up with the buffer.
  assert buffer.order() == ByteOrder.BIG_ENDIAN : buffer.order();
  assert buffer.remaining() == 0 : "should have thrown EOFException";
  assert buffer.position() == newLength;
  // 切換到讀模式
  buffer.flip();
}
NIOFSIndexInput#readInternal

接下來我們來看看readInternal,里面是調(diào)用FileChannel的read方法來填充ByteBuffer

@Override
protected void readInternal(ByteBuffer b) throws IOException {
  long pos = getFilePointer() + off;
  
  if (pos + b.remaining() > end) {
    throw new EOFException("read past EOF: " + this);
  }

  try {
    int readLength = b.remaining();
    while (readLength > 0) {
      final int toRead = Math.min(CHUNK_SIZE, readLength);
      b.limit(b.position() + toRead);
      assert b.remaining() == toRead;
      final int i = channel.read(b, pos);
      if (i < 0) { // be defensive here, even though we checked before hand, something could have changed
        throw new EOFException("read past EOF: " + this + " buffer: " + b + " chunkLen: " + toRead + " end: " + end);
      }
      assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)";
      pos += i;
      readLength -= i;
    }
    assert readLength == 0;
  } catch (IOException ioe) {
    throw new IOException(ioe.getMessage() + ": " + this, ioe);
  }
}
FileChannelImpl#read
# sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer, long)
public int read(ByteBuffer dst, long position) throws IOException {
    if (dst == null)
        throw new NullPointerException();
    if (position < 0)
        throw new IllegalArgumentException("Negative position");
    if (!readable)
        throw new NonReadableChannelException();
    if (direct)
        Util.checkChannelPositionAligned(position, alignment);
    ensureOpen();
    if (nd.needsPositionLock()) {
        synchronized (positionLock) {
            return readInternal(dst, position);
        }
    } else {
        return readInternal(dst, position);  // 走這里
    }
}
FileChannelImpl#readInternal
# sun.nio.ch.FileChannelImpl#readInternal
private int readInternal(ByteBuffer dst, long position) throws IOException {
    assert !nd.needsPositionLock() || Thread.holdsLock(positionLock);
    int n = 0;
    int ti = -1;

    try {
        // 標(biāo)記可能會(huì)長時(shí)間block
        beginBlocking();
        ti = threads.add();
        if (!isOpen())
            return -1;
        do {
            n = IOUtil.read(fd, dst, position, direct, alignment, nd);
        } while ((n == IOStatus.INTERRUPTED) && isOpen());
        return IOStatus.normalize(n);
    } finally {
        threads.remove(ti);
        // 解除block
        endBlocking(n > 0);
        assert IOStatus.check(n);
    }
}
IOUtil#read

從緩存中獲取DirectByteBuffer,如果沒有則新建一個(gè)DirectByteBuffer

將文件內(nèi)容讀取到DirectByteBuffer

將DirectByteBuffer所有字節(jié)寫入HeapByteBuffer

# sun.nio.ch.IOUtil#read(java.io.FileDescriptor, java.nio.ByteBuffer, long, boolean, int, sun.nio.ch.NativeDispatcher)
static int read(FileDescriptor fd, ByteBuffer dst, long position,
                boolean directIO, int alignment, NativeDispatcher nd)
    throws IOException
{
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, directIO, alignment, nd);

    // Substitute a native buffer
    ByteBuffer bb;
    int rem = dst.remaining();
    if (directIO) {  // directIO 是 flase
        Util.checkRemainingBufferSizeAligned(rem, alignment);
        bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
    } else {
        bb = Util.getTemporaryDirectBuffer(rem); // 從緩存中返回java.nio.DirectByteBuffer
    }
    try {
        // 從fd中根據(jù)坐標(biāo)獲取數(shù)據(jù),directIO為false
        int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd);
        bb.flip();
        if (n > 0)
            // 非常樸實(shí)無華,循環(huán)獲取byte放到目標(biāo)ByteBuffer中
            dst.put(bb);  // dts是HeapByteBuffer
        return n;
    } finally {
        // 釋放ByteBuffer,如果可以嘗試緩存ByteBuffer
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}
Util#getTemporaryDirectBuffer

從緩存中獲取DirectByteBuffer,如果沒有則新建一個(gè)DirectByteBuffer

# sun.nio.ch.Util#getTemporaryDirectBuffer
public static ByteBuffer getTemporaryDirectBuffer(int size) {
    // If a buffer of this size is too large for the cache, there
    // should not be a buffer in the cache that is at least as
    // large. So we'll just create a new one. Also, we don't have
    // to remove the buffer from the cache (as this method does
    // below) given that we won't put the new buffer in the cache.
    if (isBufferTooLarge(size)) {
        return ByteBuffer.allocateDirect(size);
    }

    BufferCache cache = bufferCache.get();
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;  // 可能走這里
    } else {
        // No suitable buffer in the cache so we need to allocate a new
        // one. To avoid the cache growing then we remove the first
        // buffer from the cache and free it.
        if (!cache.isEmpty()) {
            buf = cache.removeFirst();
            free(buf);
        }
        return ByteBuffer.allocateDirect(size);  // 返回 java.nio.DirectByteBuffer
    }
}
IOUtil#readIntoNativeBuffer

將文件內(nèi)容讀取到DirectByteBuffer

# sun.nio.ch.IOUtil#readIntoNativeBuffer
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                        long position, boolean directIO,
                                        int alignment, NativeDispatcher nd)
    throws IOException
{
    int pos = bb.position();
    int lim = bb.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);

    if (directIO) {  // directIO是false
        Util.checkBufferPositionAligned(bb, pos, alignment);
        Util.checkRemainingBufferSizeAligned(rem, alignment);
    }

    if (rem == 0)
        return 0;
    int n = 0;
    if (position != -1) {
        // 走的這里 最終調(diào)用linux的pread64
        n = nd.pread(fd, ((DirectBuffer)bb).address() + pos, rem, position);
    } else {
        n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    if (n > 0)
        // 更新bb的position
        bb.position(pos + n);
    return n;
}
FileDispatcherImpl#pread

調(diào)用native方法pread0

# sun.nio.ch.FileDispatcherImpl#pread
int pread(FileDescriptor fd, long address, int len, long position)
    throws IOException
{
    return pread0(fd, address, len, position);
}
static native int pread0(FileDescriptor fd, long address, int len,
                         long position) throws IOException;

FileDispatcherImpl.c.read0

https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/unix/native/libnio/ch/FileDispatcherImpl.c#L89

JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
                            jlong address, jint len, jlong offset)
{
    jint fd = fdval(env, fdo);
    void *buf = (void *)jlong_to_ptr(address);

    return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}
ByteBuffer#put

非常樸實(shí)無華,循環(huán)獲取byte放到目標(biāo)ByteBuffer中

# java.nio.ByteBuffer#put(java.nio.ByteBuffer)
public ByteBuffer put(ByteBuffer src) {
    if (src == this)
        throw createSameBufferException();
    if (isReadOnly())
        throw new ReadOnlyBufferException();
    int n = src.remaining();
    if (n > remaining())
        throw new BufferOverflowException();
    for (int i = 0; i < n; i++)
        put(src.get());
    return this;
}
Util#offerFirstTemporaryDirectBuffer

清空DirectByteBuffer,并嘗試將清空后的DirectByteBuffer緩存起來

# sun.nio.ch.Util#offerFirstTemporaryDirectBuffer
static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
    // If the buffer is too large for the cache we don't have to
    // check the cache. We'll just free it.
    if (isBufferTooLarge(buf)) {
        free(buf);
        return;
    }

    assert buf != null;
    BufferCache cache = bufferCache.get();
    if (!cache.offerFirst(buf)) {  // 緩存buf
        // cache is full
        free(buf);  // 清空buf
    }

mmapfs

mmapfs的實(shí)現(xiàn)類是org.apache.lucene.store.MMapDirectory

mmapfs如何獲取ByteBuffer

流程圖

es的存儲(chǔ)類型,elasticsearch,es,nio,linux

MMapDirectory#openInput

mmapfs使用MMapDirectory提供openInput返回一個(gè)IndexInput給其他的模塊使用

# org.apache.lucene.store.MMapDirectory#openInput
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
    ensureOpen();
    ensureCanRead(name);
    Path path = directory.resolve(name);
    try (FileChannel c = FileChannel.open(path, StandardOpenOption.READ)) {
        final String resourceDescription = "MMapIndexInput(path=\"" + path.toString() + "\")";
        final boolean useUnmap = getUseUnmap();  // useUnmap=true
        // 根據(jù)ByteBuffer數(shù)量選擇使用SingleBufferImpl或者M(jìn)ultiBufferImpl包裝
        return ByteBufferIndexInput.newInstance(resourceDescription,
            map(resourceDescription, c, 0, c.size()), 
            c.size(), chunkSizePower, new ByteBufferGuard(resourceDescription, useUnmap ? CLEANER : null));  // useUnmap為true
    }
}

ByteBufferIndexInput.newInstance 就是個(gè)空殼

public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, ByteBufferGuard guard) {
  if (buffers.length == 1) {
    return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, guard);
  } else {
    return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, guard);
  }
}

其中SingleBufferImpl和MultiBufferImpl本質(zhì)都是代理了ByteBuffer對(duì)象,所以這些我們直接跳過,重點(diǎn)是這個(gè)map方法

MMapDirectory#map

方法返回的ByteBuffer數(shù)組是MappedByteBuffer,具體的實(shí)現(xiàn)類是java.nio.DirectByteBufferR

maxChunkSize的獲取規(guī)則是 Constants.JRE_IS_64BIT ? (1 << 30) : (1 << 28); 在64位系統(tǒng)下是1073741824即1024MB

this.chunkSizePower = 31 Integer.numberOfLeadingZeros(maxChunkSize); = 30

這里面的preload一直是false,所以buffer.load()不會(huì)觸發(fā)

# org.apache.lucene.store.MMapDirectory#map
final ByteBuffer[] map(String resourceDescription, FileChannel fc, long offset, long length) throws IOException {
  if ((length >>> chunkSizePower) >= Integer.MAX_VALUE)
    throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + resourceDescription);
  
  final long chunkSize = 1L << chunkSizePower;  // chunkSizePower是30, chunkSize是1G
  
  // we always allocate one more buffer, the last one may be a 0 byte one
  // 根據(jù)文件大小和塊大小來計(jì)算分片數(shù)量
  final int nrBuffers = (int) (length >>> chunkSizePower) + 1;
  
  ByteBuffer buffers[] = new ByteBuffer[nrBuffers];
  
  long bufferStart = 0L;
  for (int bufNr = 0; bufNr < nrBuffers; bufNr++) { 
    int bufSize = (int) ( (length > (bufferStart + chunkSize))
        ? chunkSize
            : (length - bufferStart)
        );
    MappedByteBuffer buffer;  // buffer 是 java.nio.DirectByteBufferR
    try {
      buffer = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize);
    } catch (IOException ioe) {
      throw convertMapFailedIOException(ioe, resourceDescription, bufSize);
    }
    if (preload) {  // preload=false
      buffer.load();
    }
    buffers[bufNr] = buffer;
    bufferStart += bufSize;
  }
  
  return buffers;
}

重點(diǎn)看下fileChannel.map方法

FileChannelImpl#map

beginBlocking() 方法 標(biāo)記可能會(huì)無限期阻塞,需要和endBlocking配合使用

nd.size(fd) 獲取文件大小,然后校驗(yàn)文件大小是不是小于位點(diǎn)+塊大小,如果是則擴(kuò)張文件,但是我們是只讀模式所以這個(gè)邏輯不會(huì)走

map0(imode, mapPosition, mapSize) 這段邏輯開啟虛擬內(nèi)存地址和文件之間的映射

nd.duplicateForMapping(fd); 創(chuàng)建了一個(gè)新的FileDescriptor

創(chuàng)建新的DirectByteBufferR Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);

endBlocking()結(jié)束標(biāo)記阻塞

# sun.nio.ch.FileChannelImpl#map
// mode是MapMode.READ_ONLY
// position是開始坐標(biāo)
// size是小文件則是文件大小,大文件則是塊大小
public MappedByteBuffer map(MapMode mode, long position, long size)
    throws IOException
{
    ensureOpen();
    if (mode == null)
        throw new NullPointerException("Mode is null");
    if (position < 0L)
        throw new IllegalArgumentException("Negative position");
    if (size < 0L)
        throw new IllegalArgumentException("Negative size");
    if (position + size < 0)
        throw new IllegalArgumentException("Position + size overflow");
    if (size > Integer.MAX_VALUE)
        throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE");

    int imode = -1;
    if (mode == MapMode.READ_ONLY)
        imode = MAP_RO;
    else if (mode == MapMode.READ_WRITE)
        imode = MAP_RW;
    else if (mode == MapMode.PRIVATE)
        imode = MAP_PV;
    assert (imode >= 0);
    if ((mode != MapMode.READ_ONLY) && !writable)
        throw new NonWritableChannelException();
    if (!readable)
        throw new NonReadableChannelException();

    long addr = -1;
    int ti = -1;
    try {
        // 標(biāo)記可能會(huì)一直阻塞,需要和endBlocking配合使用
        beginBlocking();
        ti = threads.add();
        if (!isOpen())
            return null;

        long mapSize;
        int pagePosition;
        synchronized (positionLock) {
            long filesize;
            do {
                filesize = nd.size(fd);  // 獲取文件大小
            } while ((filesize == IOStatus.INTERRUPTED) && isOpen());
            if (!isOpen())
                return null;

            // 這段邏輯是寫入時(shí)需要擴(kuò)張文件,我們是只讀模式不會(huì)走這段邏輯
            if (filesize < position + size) { // Extend file size
                if (!writable) {
                    throw new IOException("Channel not open for writing " +
                        "- cannot extend file to required size");
                }
                int rv;
                do {
                    rv = nd.truncate(fd, position + size);
                } while ((rv == IOStatus.INTERRUPTED) && isOpen());
                if (!isOpen())
                    return null;
            }

            // 正常情況size 大于0
            if (size == 0) {
                addr = 0;
                // a valid file descriptor is not required
                FileDescriptor dummy = new FileDescriptor();
                if ((!writable) || (imode == MAP_RO))
                    return Util.newMappedByteBufferR(0, 0, dummy, null);
                else
                    return Util.newMappedByteBuffer(0, 0, dummy, null);
            }

            // allocationGranularity 16384 即16K
            pagePosition = (int)(position % allocationGranularity);
            long mapPosition = position - pagePosition;
            mapSize = size + pagePosition;
            try {
                // If map0 did not throw an exception, the address is valid
                // 開啟虛擬內(nèi)存地址
                // imode MAP_RO
                addr = map0(imode, mapPosition, mapSize);
            } catch (OutOfMemoryError x) {
                // An OutOfMemoryError may indicate that we've exhausted
                // memory so force gc and re-attempt map
                System.gc();
                try {
                    Thread.sleep(100);
                } catch (InterruptedException y) {
                    Thread.currentThread().interrupt();
                }
                try {
                    addr = map0(imode, mapPosition, mapSize);
                } catch (OutOfMemoryError y) {
                    // After a second OOME, fail
                    throw new IOException("Map failed", y);
                }
            }
        } // synchronized

        // On Windows, and potentially other platforms, we need an open
        // file descriptor for some mapping operations.
        FileDescriptor mfd;
        try {
            mfd = nd.duplicateForMapping(fd);
        } catch (IOException ioe) {
            unmap0(addr, mapSize);
            throw ioe;
        }

        assert (IOStatus.checkAll(addr));
        assert (addr % allocationGranularity == 0);
        int isize = (int)size;
        Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
        if ((!writable) || (imode == MAP_RO)) {
            return Util.newMappedByteBufferR(isize,
                                             addr + pagePosition,
                                             mfd,
                                             um);
        } else {
            return Util.newMappedByteBuffer(isize,
                                            addr + pagePosition,
                                            mfd,
                                            um);
        }
    } finally {
        threads.remove(ti);
        endBlocking(IOStatus.checkAll(addr));
    }
}
FileChannelImpl#map0

這段代碼的是將文件的一部分和虛擬內(nèi)存空間映射起來

# sun.nio.ch.FileChannelImpl#map0
// Creates a new mapping
private native long map0(int prot, long position, long length)
    throws IOException;

https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/unix/native/libnio/ch/FileChannelImpl.c#L74

map0源碼

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
                                     jint prot, jlong off, jlong len)
{
    void *mapAddress = 0;
    jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
    jint fd = fdval(env, fdo);
    int protections = 0;
    int flags = 0;

    if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
        protections = PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
        protections = PROT_WRITE | PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
        protections =  PROT_WRITE | PROT_READ;
        flags = MAP_PRIVATE;
    }

    mapAddress = mmap64(
        0,                    /* Let OS decide location */
        len,                  /* Number of bytes to map */
        protections,          /* File permissions */
        flags,                /* Changes are shared */
        fd,                   /* File descriptor of mapped file */
        off);                 /* Offset into file */

    if (mapAddress == MAP_FAILED) {
        if (errno == ENOMEM) {
            JNU_ThrowOutOfMemoryError(env, "Map failed");
            return IOS_THROWN;
        }
        return handle(env, -1, "Map failed");
    }

    return ((jlong) (unsigned long) mapAddress);
}
Util#newMappedByteBufferR
# sun.nio.ch.Util#newMappedByteBufferR
static MappedByteBuffer newMappedByteBufferR(int size, long addr,
                                             FileDescriptor fd,
                                             Runnable unmapper)
{
    MappedByteBuffer dbb;
    if (directByteBufferRConstructor == null)  
        initDBBRConstructor();  // 如果構(gòu)造器為空則初始化構(gòu)造器
    try {
        // 反射創(chuàng)建新的DirectByteBufferR對(duì)象
        dbb = (MappedByteBuffer)directByteBufferRConstructor.newInstance(
          new Object[] { size,
                         addr,
                         fd,
                         unmapper });
    } catch (InstantiationException |
             IllegalAccessException |
             InvocationTargetException e) {
        throw new InternalError(e);
    }
    return dbb;
}

DirectByteBufferR 構(gòu)造器

protected DirectByteBufferR(int cap, long addr,
                                 FileDescriptor fd,
                                 Runnable unmapper)
{
    super(cap, addr, fd, unmapper);
    this.isReadOnly = true;
}

DirectByteBufferR父類構(gòu)造器

protected DirectByteBuffer(int cap, long addr,
                                 FileDescriptor fd,
                                 Runnable unmapper)
{
    super(-1, 0, cap, cap, fd);
    address = addr;
    cleaner = Cleaner.create(this, unmapper);
    att = null;
}
MappedByteBuffer(int mark, int pos, int lim, int cap, // package-private
                 FileDescriptor fd)
{
    super(mark, pos, lim, cap);
    this.fd = fd;
}

mmapfs如何讀數(shù)據(jù)

@Override
public final byte readByte() throws IOException {
  try {
    // curBuf是 java.nio.DirectByteBufferR
    return guard.getByte(curBuf);  // guard 是 org.apache.lucene.store.ByteBufferGuard
  } catch (BufferUnderflowException e) {
    do {
      curBufIndex++;
      if (curBufIndex >= buffers.length) {
        throw new EOFException("read past EOF: " + this);
      }
      setCurBuf(buffers[curBufIndex]);
      curBuf.position(0);
    } while (!curBuf.hasRemaining());
    return guard.getByte(curBuf);
  } catch (NullPointerException npe) {
    throw new AlreadyClosedException("Already closed: " + this);
  }
}

guard.getByte(curBuf)只是套了一層殼

# org.apache.lucene.store.ByteBufferGuard#getByte(java.nio.ByteBuffer)
public byte getByte(ByteBuffer receiver) {
  ensureValid();
  return receiver.get();
}

最終調(diào)用java.nio.DirectByteBuffer#get()文章來源地址http://www.zghlxwxcb.cn/news/detail-861172.html

# java.nio.DirectByteBuffer#get()
public byte get() {
    try {
        return ((UNSAFE.getByte(ix(nextGetIndex()))));
    } finally {
        Reference.reachabilityFence(this);
    }
}

到了這里,關(guān)于簡單介紹ES中的索引存儲(chǔ)類型的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Elasticsearch分詞詳解:ES分詞介紹、倒排索引介紹、分詞器的作用、停用詞

    詳見:https://blog.csdn.net/weixin_40612128/article/details/123476053

    2024年02月12日
    瀏覽(33)
  • Elasticsearch學(xué)習(xí)-ES中的一些組件介紹

    Elasticsearch學(xué)習(xí)-ES中的一些組件介紹

    ES是什么 Elastic Search簡稱ES, 是一個(gè)高性能的全文檢索框架。它提供存儲(chǔ)、搜索、大數(shù)據(jù)準(zhǔn)實(shí)時(shí)分析等。一般用于提供復(fù)雜搜索的服務(wù)。 ES是基于Lucene進(jìn)行二次開發(fā)的一個(gè)框架,首先Lucene是一個(gè)類庫,業(yè)務(wù)系統(tǒng)中想要使用它,你必須使用Java來作為開發(fā)語言并將其直接集成到你

    2024年02月06日
    瀏覽(22)
  • ES簡單教程(五)使用ElasticsearchRestTemplate手動(dòng)生成ES索引 項(xiàng)目啟動(dòng)自動(dòng)生成ES索引

    其實(shí)使用 SpringBoot 項(xiàng)目玩ES的時(shí)候,人家本身是提供了一個(gè)注解 @Docment 是可以自動(dòng)在項(xiàng)目啟動(dòng)的時(shí)候創(chuàng)建ES索引的! 只不過沒用,因?yàn)?ES 的版本在升級(jí), ElasticsearchRestTemplate 配套的腳手架也在升級(jí),所以你會(huì)在網(wǎng)上遇到一個(gè)情況:搜到的各類解決方案可能都太適配你的情況,

    2024年02月03日
    瀏覽(92)
  • ES(elasticsearch)刪除指定索引

    ES(elasticsearch)刪除指定索引

    需要?jiǎng)h除指定的索引 執(zhí)行命令 比如:DELETE /mysql-status_-2023.06 執(zhí)行結(jié)果: 執(zhí)行命令 比如:HEAD /mysql-status_-2023.06 執(zhí)行結(jié)果: 說明已經(jīng)刪除完畢 刪除命令: DELETE /索引名 查看是否刪除成功: HEAD /索引名 查看索引命令: GET /索引名稱 批量查看索引命令: GET /索引名稱1,索引名稱

    2024年02月11日
    瀏覽(22)
  • 【ES】Elasticsearch-深入理解索引原理

    【ES】Elasticsearch-深入理解索引原理

    索引(Index) ES將數(shù)據(jù)存儲(chǔ)于一個(gè)或多個(gè)索引中,索引是具有類似特性的文檔的集合。類比傳統(tǒng)的關(guān)系型數(shù)據(jù)庫領(lǐng)域來說,索引相當(dāng)于SQL中的一個(gè)數(shù)據(jù)庫,或者一個(gè)數(shù)據(jù)存儲(chǔ)方案(schema)。索引由其名稱(必須為全小寫字符)進(jìn)行標(biāo)識(shí),并通過引用此名稱完成文檔的創(chuàng)建、搜索、更新

    2024年02月04日
    瀏覽(24)
  • ES簡單教程(一)創(chuàng)建ES映射實(shí)體對(duì)象,即索引

    聲明 :本教程可能并不完善,沒有一個(gè)總覽的規(guī)劃,各個(gè)模塊都相對(duì)獨(dú)立,做到哪寫到哪,僅供參考,共同學(xué)習(xí)。 ES的Java映射實(shí)體類主要與ES的索引匹配,跟傳統(tǒng)的數(shù)據(jù)庫稍微有點(diǎn)區(qū)別:ES的索引就相當(dāng)于是表,ES的文檔就相當(dāng)于表里的每一條數(shù)據(jù),大致可以這么理解作為上

    2024年02月12日
    瀏覽(17)
  • ElasticSearch---查詢es集群狀態(tài)、分片、索引

    查看es集群狀態(tài): 如果?后面加上pretty,能讓返回的json格式化。 加上?v的返回結(jié)果,如下: 解釋如下: 查看es分片信息: 查看es分片信息,模糊匹配,比如匹配test: 返回信息如下: 解析如下: 查看狀態(tài)為unassigned的es分片信息: 查看es索引 查看es所有索引: indices表示索引,是

    2024年02月02日
    瀏覽(25)
  • elasticsearch(三)-- 理解ES的索引操作

    elasticsearch(三)-- 理解ES的索引操作

    上一章我們主要學(xué)習(xí)了es的幾個(gè)客戶端,那么我們后面也主要通過kibana客戶端、HighLevelClient高級(jí)客戶端這兩個(gè)來學(xué)習(xí)es. 這一章的學(xué)習(xí)我們主要是學(xué)習(xí)一些Elasticsearch的基礎(chǔ)操作,主要是深入一些概念,比如索引的具體操作,映射的相關(guān)語法,對(duì)數(shù)據(jù)類型,文檔的操作。那么主要

    2024年02月04日
    瀏覽(18)
  • ES 文檔與索引介紹

    ES 文檔與索引介紹

    Python微信訂餐小程序課程視頻 https://blog.csdn.net/m0_56069948/article/details/122285951 Python實(shí)戰(zhàn)量化交易理財(cái)系統(tǒng) https://blog.csdn.net/m0_56069948/article/details/122285941 在之前的文章中,介紹了 ES 整體的架構(gòu)和內(nèi)容,這篇主要針對(duì) ES 最小的存儲(chǔ)單位 - 文檔以及由文檔組成的索引進(jìn)行詳細(xì)介紹。

    2023年04月08日
    瀏覽(13)
  • 【ElasticSearch】更新es索引生命周期策略,策略何時(shí)對(duì)索引生效

    【ElasticSearch】更新es索引生命周期策略,策略何時(shí)對(duì)索引生效

    大家好,我是好學(xué)的小師弟,今天和大家討論下更新es索引生命周期策略后,策略何時(shí)對(duì)索引生效 結(jié)論: 若當(dāng)前索引已應(yīng)用策略A(舊),更新完策略A后,新的策略A會(huì)立即對(duì)原來的已經(jīng)應(yīng)用該策略的索引生效;若當(dāng)前索引符合新策略A的生命周期變化條件,則會(huì)自動(dòng)進(jìn)入下一階段

    2024年02月07日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包