基于memcached for java 实现通用分布式缓存和集群分布式缓存解析

无情 阅读:256 2021-03-31 21:36:28 评论:0

本文参考借鉴:http://guazi.iteye.com/blog/107164


前提:基于memcached client for java 的基础进行的二次封装,实现缓存存储的两种模式:通用分布式缓存和集群分布式缓存。以下是对于memcached client for Java 二次封装的UML图。


对于memcached的客户端初始化在CacheFactory中通过读取配置文件cacheConfig.xml完成。通用分布式缓存,只是一个简单的封装,利用memcached client for java提供的分布式支持来实现,这里主要说一下clusterCache的实现思想:对存入的缓存对象的key值进行一次hash,找到对应的服务器存入,然后根据一定的规则再次进行hash,找到另外一个不同的服务器存入,取缓存时,先对要取的key值进行一次hash,找到主服务器,如果获取失败或者获取到的值为null,就对key进行再次hash,找到其从服务器,从这台服务器取缓存结果(如果取到结果就异步的更新到主服务器),这样就形成了主从式集群缓存。特点是:没有绝对的主节点和从节点,正常情况下所有服务器共同承担缓存服务器,在一台服务器出现异常时其他服务器共同承担增加的访问压力。

拓扑结构如下:


源代码

package com.yx.cache; 
 
public interface Cache<T> { 
	/** 
	 * 获取缓存中的数据 
	 *  
	 * @param key 
	 * @return 
	 */ 
	T get(String key); 
 
	/** 
	 * 把数据放入缓存 如果存在与key对应的值,则返回失败 
	 *  
	 * @param key 
	 * @param value 
	 * @return 
	 */ 
	boolean add(String key, T value); 
 
	/** 
	 * 把数据放入缓存 如果存在与key对应的值,则覆盖原有的值 
	 *  
	 * @param key 
	 * @param value 
	 * @return 
	 */ 
	boolean set(String key, T value); 
 
	/** 
	 * 缓存更新 如果不存在与key对应的缓存值,则不更新 
	 *  
	 * @param key 
	 * @param value 
	 * @return 
	 */ 
	boolean update(String key, T value); 
 
	/** 
	 * 删除缓存 
	 *  
	 * @param key 
	 * @return 
	 */ 
	boolean delete(String key); 
} 

通用分布式缓存实现类:

package com.yx.cache; 
 
import com.danga.MemCached.MemCachedClient; 
 
public class CommonCache<T> implements Cache<T> { 
	private static MemCachedClient memCachedClient = null; 
 
	private String base = null; 
 
	CommonCache(Class<T> t, MemCachedClient client) { 
		memCachedClient = client; 
		base = t.getSimpleName() + "-"; 
	} 
 
	public T get(String key) { 
		return (T) memCachedClient.get(base + key); 
	} 
 
	public boolean set(String key, T value) { 
		return memCachedClient.set(base + key, value); 
	} 
 
	@Override 
	public boolean update(String key, T value) { 
		return memCachedClient.replace(base + key, value); 
	} 
 
	@Override 
	public boolean delete(String key) { 
		return memCachedClient.delete(base + key); 
	} 
 
	@Override 
	public boolean add(String key, T value) { 
		return memCachedClient.add(base + key, value); 
	} 
} 


集群分布式缓存实现类

package com.yx.cache; 
 
import com.danga.MemCached.MemCachedClient; 
import com.schooner.MemCached.SchoonerSockIOPool; 
import com.yx.cache.util.HashCodeUtil; 
import com.yx.task.ThreadPoolManager; 
 
public class ClusterCache<T> implements Cache<T> { 
	private static MemCachedClient memCachedClient = null; 
 
	private static ThreadPoolManager taskManager = ThreadPoolManager 
			.getInstance("cache"); 
 
	private String base = null; 
 
	private SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance(); 
 
	ClusterCache(Class<T> t, MemCachedClient client) { 
		memCachedClient = client; 
		base = "i-" + t.getSimpleName() + "-"; 
	} 
 
	@Override 
	public T get(String key) { 
		T value = null; 
		if (key == null) { 
			return null; 
		} 
		key = base + key; 
		if (pool.getServers().length < 2) { 
			value = (T) memCachedClient.get(key); 
		} else { 
			int hashCode = HashCodeUtil.getHash(key); 
 
			value = (T) memCachedClient.get(key, hashCode); 
			if (value == null) { 
				hashCode = this.getRehashCode(key, hashCode); 
 
				value = (T) memCachedClient.get(key, hashCode); 
				if (value != null) {// 如果在另外一台服务器上取到了缓存,则恢复第一台服务器 
					UpdateTask task = new UpdateTask(key, value); 
					taskManager.submit(task); 
				} 
 
			} 
		} 
		return value; 
	} 
 
	@Override 
	public boolean set(String key, T value) { 
		if (key == null) { 
			return false; 
		} 
		key = base + key; 
		boolean result = false; 
		if (pool.getServers().length < 2) { 
			result = memCachedClient.set(key, value); 
		} else { 
			int hashCode = HashCodeUtil.getHash(key); 
 
			result = memCachedClient.set(key, value, hashCode); 
			// if (result) { 
			hashCode = getRehashCode(key, hashCode); 
			memCachedClient.set(key, value, hashCode); 
 
			// } 
		} 
		return result; 
	} 
 
	private int getRehashCode(String key, int oldHashcode) { 
		String host = pool.getHost(key, oldHashcode); 
		int rehashTries = 0; 
		// if (result) { 
		int hashCode = HashCodeUtil.getHash(rehashTries + key); 
		while (host.equals(pool.getHost(key, hashCode))) { 
			rehashTries++; 
			hashCode = HashCodeUtil.getHash(rehashTries + key); 
		} 
		return hashCode; 
	} 
 
	@Override 
	public boolean update(String key, T value) { 
		if (key == null) { 
			return false; 
		} 
		key = base + key; 
		boolean result = false; 
		if (pool.getServers().length < 2) { 
			result = memCachedClient.replace(key, value); 
		} else { 
			int hashCode = HashCodeUtil.getHash(key); 
 
			result = memCachedClient.replace(key, value, hashCode); 
			// if (result) { 
			hashCode = getRehashCode(key, hashCode); 
			memCachedClient.replace(key, value, hashCode); 
 
			// } 
		} 
		return result; 
	} 
 
	@Override 
	public boolean delete(String key) { 
		if (key == null) { 
			return false; 
		} 
		key = base + key; 
		boolean result = false; 
		if (pool.getServers().length < 2) { 
			result = memCachedClient.delete(key); 
		} else { 
			int hashCode = HashCodeUtil.getHash(key); 
 
			result = memCachedClient.delete(key, hashCode, null); 
			// if (result) { 
			hashCode = this.getRehashCode(key, hashCode); 
 
			memCachedClient.delete(key, hashCode, null); 
 
			// } 
		} 
		return result; 
	} 
 
	@Override 
	public boolean add(String key, T value) { 
		if (key == null) { 
			return false; 
		} 
		key = base + key; 
		boolean result = false; 
		if (pool.getServers().length < 2) { 
			result = memCachedClient.add(key, value); 
		} else { 
			int hashCode = HashCodeUtil.getHash(key); 
 
			result = memCachedClient.add(key, value, hashCode); 
			// if (result) { 
			hashCode = getRehashCode(key, hashCode); 
			memCachedClient.add(key, value, hashCode); 
 
			// } 
		} 
		return result; 
	} 
 
	static class UpdateTask implements Runnable { 
 
		private String key; 
		private Object value; 
 
		UpdateTask(String key, Object value) { 
			this.key = key; 
			this.value = value; 
		} 
 
		@Override 
		public void run() { 
			memCachedClient.set(key, value, HashCodeUtil.getHash(key)); 
		} 
 
	} 
} 


基于工厂模式创建memcached 存储模式(通用模式还是集群模式)

package com.yx.cache; 
 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
 
import com.danga.MemCached.MemCachedClient; 
import com.danga.MemCached.SockIOPool; 
import com.yx.cache.util.ConfigUtil; 
 
public class CacheFactory { 
	private static MemCachedClient memCachedClient = null; 
 
	@SuppressWarnings("rawtypes") 
	private static final Map<String, Cache> map = new ConcurrentHashMap<String, Cache>(); 
 
	static { 
		String serverStr = ConfigUtil.getConfigValue("servers", ""); 
 
		List<String> servers = new ArrayList<String>(); 
		for (String s : serverStr.split(",")) { 
			s = s.trim(); 
			if (!"".equals(s)) { 
				servers.add(s); 
			} 
		} 
		if (servers.size() < 1) { 
			throw new RuntimeException("cache 初始化失败!"); 
		} 
		SockIOPool pool = SockIOPool.getInstance(); 
		pool.setServers(servers.toArray(new String[] {})); 
		pool.setFailover(Boolean.valueOf(ConfigUtil.getConfigValue("failover", 
				"true"))); 
		pool.setInitConn(Integer.valueOf(ConfigUtil.getConfigValue("initConn", 
				"100"))); 
		pool.setMinConn(Integer.valueOf(ConfigUtil.getConfigValue("minConn", 
				"25"))); 
		pool.setMaxConn(Integer.valueOf(ConfigUtil.getConfigValue("maxConn", 
				"250"))); 
		pool.setMaintSleep(Integer.valueOf(ConfigUtil.getConfigValue( 
				"maintSleep", "30"))); 
		pool.setNagle(Boolean.valueOf(ConfigUtil.getConfigValue("nagle", 
				"false")));// 关闭nagle算法 
		pool.setSocketTO(Integer.valueOf(ConfigUtil.getConfigValue("socketTO", 
				"3000"))); 
		pool.setAliveCheck(Boolean.valueOf(ConfigUtil.getConfigValue( 
				"aliveCheck", "true"))); 
		pool.setHashingAlg(Integer.valueOf(ConfigUtil.getConfigValue( 
				"hashingAlg", "0"))); 
		pool.setSocketConnectTO(Integer.valueOf(ConfigUtil.getConfigValue( 
				"socketConnectTO", "3000"))); 
		String wStr = ConfigUtil.getConfigValue("weights", ""); 
		List<Integer> weights = new ArrayList<Integer>(); 
		for (String s : wStr.split(",")) { 
			s = s.trim(); 
			if (!"".equals(s)) { 
				weights.add(Integer.valueOf(s)); 
 
			} 
		} 
		if (weights.size() == servers.size()) { 
			pool.setWeights(weights.toArray(new Integer[] {})); 
		} 
		pool.initialize(); 
		memCachedClient = new MemCachedClient(); 
 
	} 
 
	public static <T> Cache<T> getCommonCache(Class<T> t) { 
		Cache<T> cache = map.get(t.getName()); 
		if (cache == null) { 
			cache = createCommonCache(t); 
		} 
		return cache; 
	} 
 
	public static <T> Cache<T> getClusterCache(Class<T> t) { 
		Cache<T> cache = map.get("i-" + t.getName()); 
		if (cache == null) { 
			cache = createClusterCache(t); 
		} 
		return cache; 
	} 
 
	private static synchronized <T> Cache<T> createCommonCache(Class<T> t) { 
		Cache<T> cache = map.get(t.getName()); 
		if (cache == null) { 
			cache = new CommonCache<T>(t, memCachedClient); 
			map.put(t.getName(), cache); 
		} 
		return cache; 
	} 
 
	private static synchronized <T> Cache<T> createClusterCache(Class<T> t) { 
		Cache<T> cache = map.get(t.getName()); 
		if (cache == null) { 
			cache = new ClusterCache<T>(t, memCachedClient); 
			map.put(t.getName(), cache); 
		} 
		return cache; 
	} 
} 


读取配置文件工具类封装和生成Hash代码工具类

ConfigUtil.Java和HashCodeUtil.java

package com.yx.cache.util; 
 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.HashMap; 
import java.util.Iterator; 
import java.util.Map; 
 
import org.dom4j.Document; 
import org.dom4j.DocumentException; 
import org.dom4j.Element; 
import org.dom4j.io.SAXReader; 
 
public class ConfigUtil { 
 
	private static final String CONFILE = "cacheConfig.xml"; 
	private static final Map<String, String> map = new HashMap<String, String>(); 
 
	static { 
		SAXReader saxReader = new SAXReader(); 
		InputStream ins = ConfigUtil.class.getClassLoader() 
				.getResourceAsStream(CONFILE); 
		try { 
			if (ins != null) { 
				Document doc = saxReader.read(ins); 
				Element root = doc.getRootElement(); 
				Iterator<Element> iter = root.elementIterator(); 
				while (iter.hasNext()) { 
					Element e = iter.next(); 
					map.put(e.getName(), e.getTextTrim()); 
				} 
			} 
		} catch (DocumentException e) { 
			// TODO Auto-generated catch block 
			e.printStackTrace(); 
			throw new RuntimeException("找不到配置文件:" + CONFILE); 
		} finally { 
			try { 
				if (ins != null) { 
					ins.close(); 
				} else { 
					throw new RuntimeException("找不到配置文件:" + CONFILE); 
				} 
			} catch (IOException e) { 
				// TODO Auto-generated catch block 
				e.printStackTrace(); 
			} 
		} 
	} 
 
	public static String getConfigValue(String key, String defaultValue) { 
		String tmp = map.get(key); 
		return isEmpty(tmp) ? defaultValue : tmp; 
	} 
 
	public static void main(String[] args) { 
		System.out.println(map); 
	} 
 
	private static boolean isEmpty(String str) { 
		if (str == null || "".equals(str)) { 
			return true; 
		} 
		return false; 
	} 
} 


package com.yx.cache.util; 
 
import java.security.MessageDigest; 
import java.security.NoSuchAlgorithmException; 
import java.util.zip.CRC32; 
 
import com.schooner.MemCached.SchoonerSockIOPool; 
 
public class HashCodeUtil { 
 
	public static final int NATIVE_HASH = 0; // native String.hashCode(); 
	public static final int OLD_COMPAT_HASH = 1; // original compatibility 
	public static final int NEW_COMPAT_HASH = 2; // new CRC32 based 
	public static final int CONSISTENT_HASH = 3; // MD5 Based -- Stops 
 
	private static int hashingAlg = SchoonerSockIOPool.getInstance() 
			.getHashingAlg(); 
 
	/** 
	 * Returns a bucket to check for a given key. 
	 *  
	 * @param key 
	 *            String key cache is stored under 
	 * @return int bucket 
	 */ 
	public static final int getHash(String key) { 
 
		switch (hashingAlg) { 
		case NATIVE_HASH: 
			return key.hashCode(); 
		case OLD_COMPAT_HASH: 
			return origCompatHashingAlg(key); 
		case NEW_COMPAT_HASH: 
			return newCompatHashingAlg(key); 
		case CONSISTENT_HASH: 
			return md5HashingAlg(key); 
		default: 
			// use the native hash as a default 
			hashingAlg = NATIVE_HASH; 
			return key.hashCode(); 
		} 
	} 
 
	private static int origCompatHashingAlg(String key) { 
		int hash = 0; 
		char[] cArr = key.toCharArray(); 
 
		for (int i = 0; i < cArr.length; ++i) { 
			hash = (hash * 33) + cArr[i]; 
		} 
 
		return hash; 
	} 
 
	private static int newCompatHashingAlg(String key) { 
		CRC32 checksum = new CRC32(); 
		checksum.update(key.getBytes()); 
		int crc = (int) checksum.getValue(); 
		return (crc >> 16) & 0x7fff; 
	} 
 
	private static int md5HashingAlg(String key) { 
		MessageDigest md5 = MD5.get(); 
		md5.reset(); 
		md5.update(key.getBytes()); 
		byte[] bKey = md5.digest(); 
		int res = ((bKey[3] & 0xFF) << 24) | ((bKey[2] & 0xFF) << 16) 
				| ((bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF); 
		return res; 
	} 
 
	private static ThreadLocal<MessageDigest> MD5 = new ThreadLocal<MessageDigest>() { 
		@Override 
		protected final MessageDigest initialValue() { 
			try { 
				return MessageDigest.getInstance("MD5"); 
			} catch (NoSuchAlgorithmException e) { 
				throw new IllegalStateException(" no md5 algorythm found"); 
			} 
		} 
	}; 
 
} 


对于集群分布式缓存还缺少一个工具类ThreadPoolManage.java

package com.yx.task; 
 
import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 
 
/** 
 * @author liuyuxiao 
 * @Date 2011-5-30 下午04:34:16 
 */ 
public class ThreadPoolManager { 
 
	private static final Map<String, ThreadPoolManager> map = new HashMap<String, ThreadPoolManager>(); 
 
	final int CORE_SIZE = 5; 
 
	private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors 
			.newFixedThreadPool(CORE_SIZE); 
 
	public void submit(Runnable task) { 
		executor.submit(task); 
	} 
 
	public boolean finished() { 
		return executor.getCompletedTaskCount() == executor.getTaskCount(); 
	} 
 
	private ThreadPoolManager() { 
 
	} 
 
	public static synchronized ThreadPoolManager getInstance(String key) { 
		ThreadPoolManager t = map.get(key); 
		if (t == null) { 
			t = new ThreadPoolManager(); 
			map.put(key, t); 
		} 
		return t; 
	} 
} 

对于集群缓存模式和通用缓存模式测试:

package com.yx.cache.test; 
 
import com.yx.cache.Cache; 
import com.yx.cache.CacheFactory; 
 
public class TestCommonCache { 
 
	/** 
	 * @param args 
	 */ 
	public static void main(String[] args) { 
 
		Cache<String> cache = CacheFactory.getCommonCache(String.class); 
		int count = 0; 
		for (int i = 0; i < 100; i++) { 
 
			// cache.set("" + i, "Hello!" + i); 
 
			String result = cache.get("" + i); 
			// System.out.println(String.format("set( %d ): %s", i, success)); 
			if (result == null) { 
				count++; 
			} 
			System.out.println(String.format("get( %d ): %s", i, result)); 
		} 
 
		System.out.println(count); 
		// for (int i = 0; i < 500; i++) { 
		// MemTask task = new MemTask(); 
		// Thread t = new Thread(task); 
		// t.start(); 
		// } 
 
	} 
} 

package com.yx.cache.test; 
 
import com.yx.cache.Cache; 
import com.yx.cache.CacheFactory; 
 
public class TestClusterCache { 
 
	public static void main(String[] args) { 
		Cache<String> cache = CacheFactory.getClusterCache(String.class); 
		int count = 0; 
		for (int i = 0; i < 100; i++) { 
 
			// cache.set("" + i, "Hello!" + i); 
 
			String result = cache.get("" + i); 
			// System.out.println(String.format("set( %d ): %s", i, success)); 
			if (result == null) { 
				count++; 
			} 
			System.out.println(String.format("get( %d ): %s", i, result)); 
		} 
	} 
} 





声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

发表评论
搜索
排行榜
KIKK导航

KIKK导航

关注我们