2014年7月15日 星期二

使用Java memory mapped file來作為IPC (Inter-Process Communication) 的橋樑

在上一篇的文章我們驗證了Java MappedByteBuffer (透過MemoryMapped file來映射)的讀寫的throughput是非常的好, 所以延續這一篇我們就利用它來做為不同Java processes (也就是不同的JVM)如何利用它來共同分享資料。

Memory Mapped File的WIKI

所謂的memory-mapped file可視為一種虛擬記憶體 (Virtual Memory)的區段 (Segment) 映射,通常都是映射至硬碟或是SSD的非揮發性的記憶裝置上。一但映射好了之後, 就可以如同一般記憶體般地來使用。

好處在於使用這種技術可以提高I/O的效率尤其是對於比較大的檔案,至於對於小檔案來說可能就不是那麼地適合。為啥? 因為作業系統的paging size大約是4 K bytes, 如果我們只映射了5 K bytes的話¸ 那麼就有3 K bytes的paging的空間是浪費了。另外一個好處就是對於一個Size很大的檔案 (比記憶體大), 我們可以使用一小塊記憶體就可以把它映射進來處理。

當然這世上是沒有白吃的午餐的, 使用memory-mapped file有可能會因為page fault (當一段資料被載入到page cache中但是映射到virtual memory的過程還沒完成)而導致速度比正常I/O還差 (還好這種機率不高---擦汗!!!!)

另外要小心的是, 在32位元的作業系統中也不適合使用, 因為32位元的記憶體定址空間有限(大約是4GiB)。

範例程式

以下我將使用兩支Java 的程式來示範如何用memory mapped file來共享資料 (IPC – Inter Process Communication)。一支程式專門用來生產message物件 (producer)及別外一支程式用來消費message物件(consumer), 因為是範例啦, 所以我沒有作太多例外的處理, 理論上兩支程式是可以是時運行的喔!!

IMemoryRepository.java

package ew.blog.memmapipc;

/**
 * 這個interface定義了一個簡單的基於記憶體的訊息存取的方法。
 * 為了便利演繹出blog文章的主軸,在訊息(message)的結構上是固定為:
 * 1.Int - 4 bytes
 * 2.Long - 8 bytes
 * 3.Byte - 1 bytes
 * 
 * 要存放一個message時要先哷叫navigate(index)方法來移動指標到對應的位址,
 * 然後呼叫setInt, setLong及setByte來存放訊息資料。若要取回訊息資料時, 
 * 也是要先哷叫navigate(index)方法來移動指標到對應的位址, 然後再使用
 * getInt, getLong及getByte方法來取回資料
 * 
 * @author ErhWen,Kuo (erhwenkuo@gmail.com)
 *
 */
public interface IMemoryRepository {
 
 void navigate(int index);

 void setInt(int value);

 void setLong(long value);

 void setByte(byte value);

 int getInt();

 long getLong();

 byte getByte();
}

MappedByteBufferRepository.java

package ew.blog.memmapipc;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;

public class MappedByteBufferRepository implements IMemoryRepository{
    // 由於ByteBuffer是一個連續的Memroy區塊, 所以必須要先定義
    // 儲放的順序與大小以便存取
    // 在本範例中, StoreValue有:
    //              1: intValue (int: 4 bytes)
    //              2: longValue (long: 8 bytes)
    //              3: byteValue (byte: 1 bytes)
    // 我們會依序來儲放StoreValue的properties
 
    private final static int intValue_Offset = 0;
    private final static int longValue_Offset = 4;
    private final static int byteValue_Offset = 12;

    // 在本範例message物件的長度(int:4 + long:8 + byte:1 = 13)
    private final static int storageValue_Size = 13; 
 
    private ByteBuffer objectStore;

    //用來作為定位每個一物件在memory中的位置(position)
    private int pos; 
 
    public MappedByteBufferRepository(File memMappedFile, int capacity){    
        RandomAccessFile raf = null;
        FileChannel fc = null;
  
 try {
     raf = new RandomAccessFile(memMappedFile, "rw");
     fc = raf.getChannel();
     objectStore = 
              fc.map(MapMode.READ_WRITE, 0, capacity * storageValue_Size);

     objectStore.order(ByteOrder.nativeOrder());
 } catch (Exception e) {
     e.printStackTrace();
 } finally{
     //把FileChannel與RandomAccessFile物件給Close起來
     try{
  fc.close();
  raf.close();
     } catch (IOException e) {}
     }
 }
 
 public void navigate(int index) {
     pos = index * storageValue_Size;
 }

 public void setInt(int value) {
     objectStore.putInt(pos + intValue_Offset, value);
 }

 public void setLong(long value) {
     objectStore.putLong(pos + longValue_Offset, value);  
 }

 public void setByte(byte value) {
     objectStore.put(pos + byteValue_Offset, value);  
 }

 public int getInt() {
     return objectStore.getInt(pos + intValue_Offset);
 }

 public long getLong() {
     return objectStore.getLong(pos + longValue_Offset);
 }

 public byte getByte() {
     return objectStore.get(pos + byteValue_Offset);
 }

}

MemMappedMsgWriter.java

package ew.blog.memmapipc;

import java.io.File;
public class MemMappedMsgWriter {
    public static void main(String[] args) throws Exception {  
        // 取得temp的目錄路徑
        String tmpDir = System.getProperty("java.io.tmpdir"); 
 
        // 為這個memory mapped的檔案給名
        String memMappedFileName = tmpDir + "/memmappedfile.dat"; 
 
        // 產生一個標準的Java File物件
        File memMappedFile = new File(memMappedFileName);
  
        // 預計產生一千萬個Message (每個Message長度是13 bytes)
        int messageSize = 10000000; 
  
        // 初始一個Memory Mapped File的ByteBuffer來儲存資料
        IMemoryRepository memStore = 
           new MappedByteBufferRepository(memMappedFile, messageSize); 
 
        long start = System.currentTimeMillis();

        for(int i=0; i<messageSize; i++){

            // 移動指標
     memStore.navigate(i);  
     // 儲放資料
     memStore.setInt(i);
     memStore.setLong((long)i);
     memStore.setByte((byte)i);
        }  

        long end = System.currentTimeMillis();
 
        System.out.println(
           String.format("Persist %s Messages, total spends %s ms",
              messageSize, (end-start)));
    }
}

MemMappedMsgReader.java

package ew.blog.memmapipc;

import java.io.File;

public class MemMappedMsgReader {

    public static void main(String[] args) throws Exception {  
        // 取得temp的目錄路徑
 String tmpDir = System.getProperty("java.io.tmpdir"); 
 
 // 為這個memory mapped的檔案給名
 String memMappedFileName = tmpDir + "/memmappedfile.dat"; 
 
 // 產生一個標準的Java File物件
 File memMappedFile = new File(memMappedFileName);
  
 // 預計讀進一千萬個Messages (每個Message長度是13 bytes)
 int messageSize = 10000000; 
  
 // 初始一個Memory Mapped File的ByteBuffer來讀取資料
 IMemoryRepository memStore = 
           new MappedByteBufferRepository(memMappedFile, messageSize);  

 long start = System.currentTimeMillis(); 
 
 for(int i=0; i<messageSize; i++){

     // 移動指標
     memStore.navigate(i);
   
     // 讀取資 料
     int messageIntData = memStore.getInt();
     long messageLongData = memStore.getLong();
     byte messageByteDataq = memStore.getByte();
 }
  
 long end = System.currentTimeMillis();  
 System.out.println(
            String.format("Read %s Messages, total spends %s ms", 
               messageSize, (end-start)));
 }
}

結論

Memory mapped file的技術對於開發IPC (Inter-Process Communication)相關的需求來說應該是一種很不錯的選項。而且讀寫的速度相對使用Socket的方法來說是有效率的多了。

範例的源碼在這裡

沒有留言: