Caffeine缓存是一个高性能、可扩展、内存优化的 Java 缓存库,基于 Google 的 Guava Cache演进而来并提供了接近最佳的命中率。
Caffeine 缓存包含以下特点:
正是因为Caffeine具备的上述特性,Caffeine作为项目中本地缓存的不二选择,越来越多的项目集成了Caffeine的功能,进而衍生了一系列的业务视角的需求。
日常使用的需求之一希望能够实时评估Caffeine实例的内存占用情况并能够提供动态调整缓存参数的能力,但是已有的内存分析工具MAT需要基于dump的文件进行分析无法做到实时,这也是整个事情的起因之一。
基于上述的需求背景,结合caffeine的已有功能和定制的部分源码开发,整体作为caffeine可视化的技术项目进行推进和落地。
Caffeine可视化项目目前已支持功能包括:
说明:
说明:
说明:
说明:
说明:
Caffeine框架功能整合
说明:
Caffeine可视化框架
说明:
业务层-缓存对象的管理
static Cache<String, List<String>> accountWhiteCache = Caffeine.newBuilder() .expireAfterWrite(VivoConfigManager.getInteger("trade.account.white.list.cache.ttl", 10), TimeUnit.MINUTES) .recordStats().maximumSize(VivoConfigManager.getInteger("trade.account.white.list.cache.size", 100)).build();常规的Caffeine实例的创建方式 static Cache<String, List<String>> accountWhiteCache = Caffeine.newBuilder().applyName("accountWhiteCache") .expireAfterWrite(VivoConfigManager.getInteger("trade.account.white.list.cache.ttl", 10), TimeUnit.MINUTES) .recordStats().maximumSize(VivoConfigManager.getInteger("trade.account.white.list.cache.size", 100)).build();支持实例命名的Caffeine实例的创建方式
说明:
public final class Caffeine<K, V> { /** * caffeine的实例名称 */ String instanceName; /** * caffeine的实例维护的Map信息 */ static Map<String, Cache> cacheInstanceMap = new ConcurrentHashMap<>(); @NonNull public <K1 extends K, V1 extends V> Cache<K1, V1> build() { requireWeightWithWeigher(); requireNonLoadingCache(); @SuppressWarnings("unchecked") Caffeine<K1, V1> self = (Caffeine<K1, V1>) this; Cache localCache = isBounded() ? new BoundedLocalCache.BoundedLocalManualCache<>(self) : new UnboundedLocalCache.UnboundedLocalManualCache<>(self); if (null != localCache && StringUtils.isNotEmpty(localCache.getInstanceName())) { cacheInstanceMap.put(localCache.getInstanceName(), localCache); } return localCache; }}
说明:
业务层-内存占用的预估
import jdk.nashorn.internal.ir.debug.ObjectSizeCalculator; public abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V> implements LocalCache<K, V> { final ConcurrentHashMap<Object, Node<K, V>> data; @Override public long getMemoryUsed() { // 预估内存占用 return ObjectSizeCalculator.getObjectSize(data); }}
说明:
业务层-数据上报机制
public static StatsData getCacheStats(String instanceName) { Cache cache = Caffeine.getCacheByInstanceName(instanceName); CacheStats cacheStats = cache.stats(); StatsData statsData = new StatsData(); statsData.setInstanceName(instanceName); statsData.setTimeStamp(System.currentTimeMillis()/1000); statsData.setMemoryUsed(String.valueOf(cache.getMemoryUsed())); statsData.setEstimatedSize(String.valueOf(cache.estimatedSize())); statsData.setRequestCount(String.valueOf(cacheStats.requestCount())); statsData.setHitCount(String.valueOf(cacheStats.hitCount())); statsData.setHitRate(String.valueOf(cacheStats.hitRate())); statsData.setMissCount(String.valueOf(cacheStats.missCount())); statsData.setMissRate(String.valueOf(cacheStats.missRate())); statsData.setLoadCount(String.valueOf(cacheStats.loadCount())); statsData.setLoadSuccessCount(String.valueOf(cacheStats.loadSuccessCount())); statsData.setLoadFailureCount(String.valueOf(cacheStats.loadFailureCount())); statsData.setLoadFailureRate(String.valueOf(cacheStats.loadFailureRate())); Optional<Eviction> optionalEviction = cache.policy().eviction(); optionalEviction.ifPresent(eviction -> statsData.setMaximumSize(String.valueOf(eviction.getMaximum()))); Optional<Expiration> optionalExpiration = cache.policy().expireAfterWrite(); optionalExpiration.ifPresent(expiration -> statsData.setExpireAfterWrite(String.valueOf(expiration.getExpiresAfter(TimeUnit.SECONDS)))); optionalExpiration = cache.policy().expireAfterAccess(); optionalExpiration.ifPresent(expiration -> statsData.setExpireAfterAccess(String.valueOf(expiration.getExpiresAfter(TimeUnit.SECONDS)))); optionalExpiration = cache.policy().refreshAfterWrite(); optionalExpiration.ifPresent(expiration -> statsData.setRefreshAfterWrite(String.valueOf(expiration.getExpiresAfter(TimeUnit.SECONDS)))); return statsData;}
说明:
public static void sendReportData() { try { if (!VivoConfigManager.getBoolean("memory.caffeine.data.report.switch", true)) { return; } // 1、获取所有的cache实例对象 Method listCacheInstanceMethod = HANDLER_MANAGER_CLASS.getMethod("listCacheInstance", null); List<String> instanceNames = (List)listCacheInstanceMethod.invoke(null, null); if (CollectionUtils.isEmpty(instanceNames)) { return; } String appName = System.getProperty("app.name"); String localIp = getLocalIp(); String localPort = String.valueOf(NetPortUtils.getWorkPort()); ReportData reportData = new ReportData(); InstanceData instanceData = new InstanceData(); instanceData.setAppName(appName); instanceData.setIp(localIp); instanceData.setPort(localPort); // 2、遍历cache实例对象获取缓存监控数据 Method getCacheStatsMethod = HANDLER_MANAGER_CLASS.getMethod("getCacheStats", String.class); Map<String, StatsData> statsDataMap = new HashMap<>(); instanceNames.stream().forEach(instanceName -> { try { StatsData statsData = (StatsData)getCacheStatsMethod.invoke(null, instanceName); statsDataMap.put(instanceName, statsData); } catch (Exception e) { } }); // 3、构建上报对象 reportData.setInstanceData(instanceData); reportData.setStatsDataMap(statsDataMap); // 4、发送Http的POST请求 HttpPost httpPost = new HttpPost(getReportDataUrl()); httpPost.setConfig(requestConfig); StringEntity stringEntity = new StringEntity(JSON.toJSONString(reportData)); stringEntity.setContentType("application/json"); httpPost.setEntity(stringEntity); HttpResponse response = httpClient.execute(httpPost); String result = EntityUtils.toString(response.getEntity(),"UTF-8"); EntityUtils.consume(response.getEntity()); logger.info("Caffeine 数据上报成功 URL {} 参数 {} 结果 {}", getReportDataUrl(), JSON.toJSONString(reportData), result); } catch (Throwable throwable) { logger.error("Caffeine 数据上报失败 URL {} ", getReportDataUrl(), throwable); }}
说明:
业务层-配置动态下发
public static ExecutionResponse dispose(ExecutionRequest request) { ExecutionResponse executionResponse = new ExecutionResponse(); executionResponse.setCmdType(CmdTypeEnum.INSTANCE_CONFIGURE.getCmd()); executionResponse.setInstanceName(request.getInstanceName()); String instanceName = request.getInstanceName(); Cache cache = Caffeine.getCacheByInstanceName(instanceName); // 设置缓存的最大条目 if (null != request.getMaximumSize() && request.getMaximumSize() > 0) { Optional<Eviction> optionalEviction = cache.policy().eviction(); optionalEviction.ifPresent(eviction ->eviction.setMaximum(request.getMaximumSize())); } // 设置写后过期的过期时间 if (null != request.getExpireAfterWrite() && request.getExpireAfterWrite() > 0) { Optional<Expiration> optionalExpiration = cache.policy().expireAfterWrite(); optionalExpiration.ifPresent(expiration -> expiration.setExpiresAfter(request.getExpireAfterWrite(), TimeUnit.SECONDS)); } // 设置访问过期的过期时间 if (null != request.getExpireAfterAccess() && request.getExpireAfterAccess() > 0) { Optional<Expiration> optionalExpiration = cache.policy().expireAfterAccess(); optionalExpiration.ifPresent(expiration -> expiration.setExpiresAfter(request.getExpireAfterAccess(), TimeUnit.SECONDS)); } // 设置写更新的过期时间 if (null != request.getRefreshAfterWrite() && request.getRefreshAfterWrite() > 0) { Optional<Expiration> optionalExpiration = cache.policy().refreshAfterWrite(); optionalExpiration.ifPresent(expiration -> expiration.setExpiresAfter(request.getRefreshAfterWrite(), TimeUnit.SECONDS)); } executionResponse.setCode(0); executionResponse.setMsg("success"); return executionResponse;}
说明:
业务层-缓存数据清空
/** * 失效缓存的值 * @param request * @return */ public static ExecutionResponse invalidate(ExecutionRequest request) { ExecutionResponse executionResponse = new ExecutionResponse(); executionResponse.setCmdType(CmdTypeEnum.INSTANCE_INVALIDATE.getCmd()); executionResponse.setInstanceName(request.getInstanceName()); try { // 查找对应的cache实例 String instanceName = request.getInstanceName(); Cache cache = Caffeine.getCacheByInstanceName(instanceName); // 处理清空指定实例的所有缓存 或 指定实例的key对应的缓存 Object cacheKeyObj = request.getCacheKey(); // 清除所有缓存 if (Objects.isNull(cacheKeyObj)) { cache.invalidateAll(); } else { // 清除指定key对应的缓存 if (Objects.equals(request.getCacheKeyType(), 2)) { cache.invalidate(Long.valueOf(request.getCacheKey().toString())); } else if (Objects.equals(request.getCacheKeyType(), 3)) { cache.invalidate(Integer.valueOf(request.getCacheKey().toString())); } else { cache.invalidate(request.getCacheKey().toString()); } } executionResponse.setCode(0); executionResponse.setMsg("success"); } catch (Exception e) { executionResponse.setCode(-1); executionResponse.setMsg("fail"); } return executionResponse; }}
业务层-缓存数据查询
public static ExecutionResponse inspect(ExecutionRequest request) { ExecutionResponse executionResponse = new ExecutionResponse(); executionResponse.setCmdType(CmdTypeEnum.INSTANCE_INSPECT.getCmd()); executionResponse.setInstanceName(request.getInstanceName()); String instanceName = request.getInstanceName(); Cache cache = Caffeine.getCacheByInstanceName(instanceName); Object cacheValue = cache.getIfPresent(request.getCacheKey()); if (Objects.equals(request.getCacheKeyType(), 2)) { cacheValue = cache.getIfPresent(Long.valueOf(request.getCacheKey().toString())); } else if (Objects.equals(request.getCacheKeyType(), 3)) { cacheValue = cache.getIfPresent(Integer.valueOf(request.getCacheKey().toString())); } else { cacheValue = cache.getIfPresent(request.getCacheKey().toString()); } if (Objects.isNull(cacheValue)) { executionResponse.setData(""); } else { executionResponse.setData(JSON.toJSONString(cacheValue)); } return executionResponse;}
说明:
通信层-监听服务
public class ServerManager { private Server jetty; /** * 创建jetty对象 * @throws Exception */ public ServerManager() throws Exception { int port = NetPortUtils.getAvailablePort(); jetty = new Server(port); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); context.setContextPath("/"); context.addServlet(ClientServlet.class, "/caffeine"); jetty.setHandler(context); } /** * 启动jetty对象 * @throws Exception */ public void start() throws Exception { jetty.start(); }} public class ClientServlet extends HttpServlet { private static final Logger logger = LoggerFactory.getLogger(ClientServlet.class); @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { super.doGet(req, resp); } @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { ExecutionResponse executionResponse = null; String requestJson = null; try { // 获取请求的相关的参数 String contextPath = req.getContextPath(); String servletPath = req.getServletPath(); String requestUri = req.getRequestURI(); requestJson = IOUtils.toString(req.getInputStream(), StandardCharsets.UTF_8); // 处理不同的命令 ExecutionRequest executionRequest = JSON.parseObject(requestJson, ExecutionRequest.class); // 通过反射来来处理类依赖问题 executionResponse = DisposeCenter.dispatch(executionRequest); } catch (Exception e) { logger.error("vivo-memory 处理请求异常 {} ", requestJson, e); } if (null == executionResponse) { executionResponse = new ExecutionResponse(); executionResponse.setCode(-1); executionResponse.setMsg("处理异常"); } // 组装相应报文 resp.setContentType("application/json; charset=utf-8"); PrintWriter out = resp.getWriter(); out.println(JSON.toJSONString(executionResponse)); out.flush(); }}
说明:
通信层-心跳设计
/** * 发送心跳数据 */public static void sendHeartBeatData() { try { if (!VivoConfigManager.getBoolean("memory.caffeine.heart.report.switch", true)) { return; } // 1、构建心跳数据 String appName = System.getProperty("app.name"); String localIp = getLocalIp(); String localPort = String.valueOf(NetPortUtils.getWorkPort()); HeartBeatData heartBeatData = new HeartBeatData(); heartBeatData.setAppName(appName); heartBeatData.setIp(localIp); heartBeatData.setPort(localPort); heartBeatData.setTimeStamp(System.currentTimeMillis()/1000); // 2、发送Http的POST请求 HttpPost httpPost = new HttpPost(getHeartBeatUrl()); httpPost.setConfig(requestConfig); StringEntity stringEntity = new StringEntity(JSON.toJSONString(heartBeatData)); stringEntity.setContentType("application/json"); httpPost.setEntity(stringEntity); HttpResponse response = httpClient.execute(httpPost); String result = EntityUtils.toString(response.getEntity(),"UTF-8"); EntityUtils.consume(response.getEntity()); logger.info("Caffeine 心跳上报成功 URL {} 参数 {} 结果 {}", getHeartBeatUrl(), JSON.toJSONString(heartBeatData), result); } catch (Throwable throwable) { logger.error("Caffeine 心跳上报失败 URL {} ", getHeartBeatUrl(), throwable); }}
说明:
vivo技术团队在Caffeine的使用经验上曾有过多次分享,可参考公众号文章《如何把 Caffeine Cache 用得如丝般顺滑》,此篇文章在使用的基础上基于使用痛点进行进一步的定制。
目前Caffeine可视化的项目已经在相关核心业务场景中落地并发挥作用,整体运行平稳。使用较多的功能包括项目维度的caffeine实例的全局管控,单实例维度的内存占用评估和缓存命中趋势评估。
如通过单实例的内存占用评估功能能够合理评估缓存条目设置和内存占用之间的关系;通过分析缓存命中率的整体趋势评估缓存的参数设置合理性。
期待此篇文章能够给业界缓存使用和监控带来一些新思路。
本文链接://www.dmpip.com//www.dmpip.com/showinfo-26-103570-0.html缓存框架 Caffeine 的可视化探索与实践
声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com
上一篇: 结合实例深入理解C++对象的内存布局
下一篇: 关于 C# 12 新增功能实操!