订阅云计算RSS CSDN首页> 云计算

性能测试:SequoiaDB vs. MongoDB vs. Cassandra vs. HBase

发表于2014-09-22 14:58| 次阅读| 来源CSDN| 0 条评论| 作者云知秋

摘要:NoSQL通过弱化一部分关系型数据库特性(如一致性和关系模型)来提升其可扩展性及高可用性,弥补了关系型数据库在很多互联网类应用中的不足。因此,不同的NoSQL有着不同的杀手级应用,这里我们通过基准测试摸索。

附录C:SequoiaDB接口

package com.yahoo.ycsb.db;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.bson.BSONObject;
import org.bson.BasicBSONObject;
import org.bson.types.BasicBSONList;

import com.sequoiadb.base.SequoiadbOption;
import com.sequoiadb.base.SequoiadbDatasource;
import com.sequoiadb.base.CollectionSpace;
import com.sequoiadb.base.DBCollection;
import com.sequoiadb.base.DBCursor;
import com.sequoiadb.base.Sequoiadb;


public class SequoiaDBClient extends DB {

	/** Used to include a field in a response. */
	protected static final Integer INCLUDE = Integer.valueOf(1);

	/** The key field name */
	//private static final String KEY_FILED = "key";
	private static final String KEY_FILED   = "_id";
	private static final String DEFAULT_INSERTMODE="single";
    
	/**
	 * Count the number of times initialized to teardown on the last
	 * {@link #cleanup()}.
	 */
	private static final AtomicInteger initCount = new AtomicInteger(0);
	private static int bulknum = 0;

	/** Sequoiadb instance. */
	private static SequoiadbDatasource sdbpools = null;
	private Sequoiadb sdb = null;
  
	/** CollectionSpace instance. */
	private CollectionSpace cs = null;
	private DBCollection cl       = null;
	private static String keyfield   = null;
	private static String insertmode = null;
	private List<BSONObject> objs    = new ArrayList<BSONObject>(); 
	//private int callnumber =0;
	private static String spacename = "ycsb";
	
	//private DBCollection collection = null;

	/**
	 * Initialize any state for this DB. Called once per DB instance; there is
	 * one DB instance per client thread.
	 */
	public void init() throws DBException {
		initCount.incrementAndGet();
		synchronized (INCLUDE) {
			if (sdb != null) {
				return;
			}
			
			try{
			   if (sdbpools != null){
				   sdb = sdbpools.getConnection();
				   cs = sdb.getCollectionSpace(spacename);
				   return;
			   }
			}catch(Exception e){
				e.printStackTrace();
				return;
			}
			
			// initialize sequoiadb driver
			Properties props = getProperties();
			String host = props.getProperty("sequoiadb.host", "localhost");
			String port = props.getProperty("sequoiadb.port", "11810");
			keyfield = props.getProperty("sequoiadb.keyfield", "_id");
			int maxConnectionnum = Integer.parseInt(props.getProperty("sequoiadb.maxConnectionnum","100"));
			int maxidleconnnum = Integer.parseInt(props.getProperty("sequoiadb.maxConnectionnum","10"));
			int period = Integer.parseInt(props.getProperty("sequoiadb.maxConnectionnum","300"));

			//String 
			spacename = props.getProperty("sequoiadb.space", spacename);
			insertmode=props.getProperty("sequoiadb.insertmode", DEFAULT_INSERTMODE);
			bulknum = Integer.parseInt(props.getProperty("sequoiadb.bulknumber","5000"));
			
			try {
				SequoiadbOption sdbOption = new SequoiadbOption();
				sdbOption.setMaxConnectionNum(maxConnectionnum);
				sdbOption.setMaxIdeNum(maxidleconnnum);
				sdbOption.setRecheckCyclePeriod(period*1000);
				
				sdbpools = new SequoiadbDatasource(host+":"+port,"","",sdbOption);
				// need to append db to url.
				//sdb = new Sequoiadb(host, Integer.parseInt(port), "", "");
				sdb = sdbpools.getConnection();
				if (!sdb.isCollectionSpaceExist(spacename)) {
					cs = sdb.createCollectionSpace(spacename);
				} else {
					cs = sdb.getCollectionSpace(spacename);
				}
				System.out.println("sequoiadb connection created with " + host
						+ ":" + port);

			} catch (Exception e1) {
				System.err
						.println("Could not initialize Sequoiadb connection pool for Loader: "
								+ e1.toString());
				e1.printStackTrace();
				throw new DBException(e1.toString());
			}
		}
	}

	/**
	 * Cleanup any state for this DB. Called once per DB instance; there is one
	 * DB instance per client thread.
	 */
	public void cleanup() throws DBException {
		initCount.decrementAndGet();
		try {
			if (0 != objs.size()){
				cl.bulkInsert(objs, DBCollection.FLG_INSERT_CONTONDUP);
			}
			sdbpools.close(sdb);
		} catch (Exception e1) {
			System.err.println("Could not close Sequoiadb connection pool: "
					+ e1.toString());
			e1.printStackTrace();
			return;
		}
	}
	private List<String> getAllDataGroup(){
		// 获取数据组的数量
		List<String> groups = new ArrayList<String>();
		DBCursor cursor = sdb.getList(Sequoiadb.SDB_LIST_GROUPS, null, null, null);
		while (cursor.hasNext()){
			BSONObject obj = cursor.getNext();
			String groupname = (String)obj.get("GroupName");
			if (!groupname.equals("SYSCatalogGroup")){
				groups.add(groupname);
			}	
		}
		return groups;
	}
	
	private int getPartition(String spacename, String tablename){
		//获取源数据组
		BSONObject condition = new BasicBSONObject();
		condition.put("Name", spacename + "." + tablename);
		DBCursor cr = sdb.getSnapshot(Sequoiadb.SDB_SNAP_CATALOG, condition, null, null);
		int Partition = 0;
		while(cr.hasNext()){
			BSONObject obj = cr.getNext();
			Partition = ((Integer)obj.get("Partition")).intValue();
		}
		return Partition;
	}
	
	private String getSrcDataGroup(String spacename, String tablename){
		//获取源数据组
		BSONObject condition = new BasicBSONObject();
		condition.put("Name", spacename + "." + tablename);
		DBCursor cr = sdb.getSnapshot(Sequoiadb.SDB_SNAP_CATALOG, condition, null, null);
		String srcgroup = "";
		while(cr.hasNext()){
			BSONObject obj = cr.getNext();
			BasicBSONList catainfo = (BasicBSONList)obj.get("CataInfo");
			srcgroup=(String)((BSONObject)catainfo.get(0)).get("GroupName");
		}
		
		return srcgroup;
	}
	
	private void splitCollection(DBCollection cl, String spacename, String tablename){
		// 获取数据组的数量
		List<String> groups = getAllDataGroup();
		int Partition = getPartition(spacename, tablename);
		String srcgroup = getSrcDataGroup(spacename, tablename);
		
		int part = Partition / groups.size();
		int remainder = Partition % groups.size();
		int startpart = Partition - remainder;
		for (int i=0;i<groups.size();++i){
			//切分
			BSONObject start = new BasicBSONObject();
			start.put("Partition", i*part);
			BSONObject end = new BasicBSONObject();
			end.put("Partition", (i+1)*part);
			if (!groups.get(i).equals(srcgroup)){
				cl.split(srcgroup, groups.get(i), start,end);
				
				if (0 != remainder){
					BSONObject remainderstart = new BasicBSONObject();
					remainderstart.put("Partition", startpart);
					BSONObject remainderend = new BasicBSONObject();
					remainderend.put("Partition", startpart + 1);
					cl.split(srcgroup, groups.get(i), remainderstart,remainderend);
					--remainder;
				}
			}
		}
	}
	
	private void createCollection(String table) throws DBException {
		BSONObject options = new BasicBSONObject();
		BSONObject subobj = new BasicBSONObject();
		subobj.put(KEY_FILED, 1);
		options.put("ShardingKey", subobj);
		options.put("ShardingType", "hash");
		options.put("EnsureShardingIndex", false);
		
		cl = cs.createCollection(table, options);
		splitCollection(cl,spacename,table);
		
		if (0 != keyfield.compareTo("_id")){
			cl.createIndex("index",
				           "{" + keyfield + ":1}", true, true);
		}
	}
	
	private DBCollection getCollection(String table){
		if (sdb == null){
			try{
				sdb = sdbpools.getConnection();
			}catch(Exception e)
			{
				e.printStackTrace();
				return null;
			}
		}
		
		if (cs == null){
			try{
				cs = sdb.getCollectionSpace(spacename);
			}catch(Exception e)
			{
				e.printStackTrace();
				return null;
			}
		}
		
		if (cl == null){
			try {
				boolean bExist = cs.isCollectionExist(table);
				if (!bExist) {
					synchronized (INCLUDE) {
						if (cs.isCollectionExist(table)) {
							cl = cs.getCollection(table);
						} else {
							createCollection(table);
						}
				    }
				} else {
					cl = cs.getCollection(table);
				}
			}catch(Exception e)
			{
				e.printStackTrace();
				cl = null;
				sdbpools.close(sdb);
				sdb = null;
				return getCollection(table);
			}
		}
		return cl;
	}
	
	@Override
	public int read(String table, String key, Set<String> fields,
			HashMap<String, ByteIterator> result) {
		DBCursor cursor = null;
		DBCollection collection = null;
		try {
			collection = getCollection(table);
			if (collection == null) {
				System.out.println("Failed to get collection " + table);
			}

			BSONObject query = new BasicBSONObject().append(keyfield, key);
			BSONObject fieldsToReturn = null;
			if (fields != null) {
				fieldsToReturn = new BasicBSONObject();
				Iterator<String> iter = fields.iterator();
				while (iter.hasNext()) {
					fieldsToReturn.put(iter.next(), "");
				}
			}

			cursor = collection.query(query, fieldsToReturn, null, null);
			if (cursor != null && cursor.hasNext()) {
				HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();

				fillMap(resultMap, cursor.getNext());
				result.putAll(resultMap);
				return 0;
			} else {
				return 1;
			}
		} catch (Exception e) {
			System.err.println(e.toString());
			e.printStackTrace();
			return 1;
		} finally {
			if (cursor != null) {
				cursor.close();
			}
		}
	}

	/**
	 * TODO - Finish
	 * 
	 * @param resultMap
	 * @param obj
	 */
	@SuppressWarnings("unchecked")
	protected void fillMap(HashMap<String, ByteIterator> resultMap,
			               BSONObject obj) {
		Map<String, Object> objMap = obj.toMap();
		for (Map.Entry<String, Object> entry : objMap.entrySet()) {
			if (entry.getValue() instanceof byte[]) {
				resultMap.put(entry.getKey(), new ByteArrayByteIterator(
						(byte[]) entry.getValue()));
			}
		}
	}

	@Override
	public int scan(String table, String startkey, int recordcount,
			Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
		DBCursor cursor = null;
		try {
			DBCollection collection = getCollection(table);
			
			BSONObject scanRange = new BasicBSONObject().append("$gte",
					                     startkey);
			BSONObject query = new BasicBSONObject().append(keyfield,
					                 scanRange);
			BSONObject fieldsToReturn = null;
			if (fields != null) {
				fieldsToReturn = new BasicBSONObject();
				Iterator<String> iter = fields.iterator();
				while (iter.hasNext()) {
					fieldsToReturn.put(iter.next(), "");
				}
			}

			cursor = collection.query(query, fieldsToReturn, null, null, 0,
					                  recordcount);
			while (cursor.hasNext()) {
				// toMap() returns a Map, but result.add() expects a
				// Map<String,String>. Hence, the suppress warnings.
				HashMap<String, ByteIterator> resultMap = new HashMap<String, ByteIterator>();
				BSONObject obj = cursor.getNext();
				fillMap(resultMap, obj);
				result.add(resultMap);
			}
			return 0;
		} catch (Exception e) {
			System.err.println(e.toString());
			e.printStackTrace();
			return 1;
		} finally {
			if (cursor != null) {
				cursor.close();
			}
		}
	}

	@Override
	public int update(String table, String key,
			HashMap<String, ByteIterator> values) {
		try {
			DBCollection collection = getCollection(table);
			if (collection == null) {
				System.out.println("Failed to get collection " + table);
				return -1;
			}
			
			BSONObject query = new BasicBSONObject().append(keyfield, key);
			BSONObject update = new BasicBSONObject();
			BSONObject fieldsToSet = new BasicBSONObject();
			
			Iterator<String> keys = values.keySet().iterator();
			while (keys.hasNext()) {
				String tmpKey = keys.next();
				fieldsToSet.put(tmpKey, values.get(tmpKey).toArray());
			}
			
			update.put("$set", fieldsToSet);
			collection.update(query, update, null);
			return 0;
		} catch (Exception e) {
			System.err.println(e.toString());
			e.printStackTrace();
			return 1;
		}
	}

	@Override
	public int insert(String table, String key,
			HashMap<String, ByteIterator> values) {
		try {
			DBCollection collection = getCollection(table);
			BSONObject record = new BasicBSONObject().append(keyfield, key);
			for (String k : values.keySet()) {
				record.put(k, values.get(k).toArray());
			}
			
			if (insertmode.equals(DEFAULT_INSERTMODE)){
				collection.insert(record);
			}
			else{
				if (objs.size() != bulknum){
					objs.add(record);
				}
				
				if (objs.size() ==  bulknum){
					collection.bulkInsert(objs,DBCollection.FLG_INSERT_CONTONDUP);
					objs.clear();
				}
			}
			return 0;
		} catch (Exception e) {
			System.err.println(e.toString());
			e.printStackTrace();
			return 1;
		}
	}

	@Override
	public int delete(String table, String key) {
		try {
			DBCollection collection = getCollection(table);
			BSONObject record = new BasicBSONObject().append(keyfield, key);
			collection.delete(record);
			return 0;
		} catch (Exception e) {
			System.err.println(e.toString());
			e.printStackTrace();
			return 1;
		}
	}
	
	public static void main(String[] args){
		
		Properties props = new Properties();
		props.setProperty("sequoiadb.host", "192.168.30.63");
		props.setProperty("sequoiadb.port", "11810");
		props.setProperty("sequoiadb.space", "test");

		SequoiaDBClient client = new SequoiaDBClient();
		client.setProperties(props);
		
		try{
			client.init();
			client.getCollection("usertable");
		}catch(DBException e){
			e.printStackTrace();
		}
	}
}


免费订阅“CSDN云计算(左)CSDN大数据(右)”微信公众号,实时掌握第一手云中消息,了解最新的大数据进展!

CSDN发布虚拟化、Docker、OpenStack、CloudStack、数据中心等相关云计算资讯,     分享Hadoop、Spark、NoSQL/NewSQL、HBase、Impala、内存计算、流计算、机器学习和智能算法等相关大数据观点,提供云计算和大数据技术、平台、实践和产业信息等服务。   

  • CSDN官方微信
  • 扫描二维码,向CSDN吐槽
  • 微信号:CSDNnews
程序员移动端订阅下载

微博关注

相关热门文章