使用 per4j 做一个接口访问频次监视系统 (A)

生产环境中,有大规模的面向服务的架构和分布式应用,怎么样监控到这些服务接口的调用频次以及接口的性能表现呢?

per4j 日志组件是个不错的选择,具体使用可以参考这篇文章:使用Perf4J进行性能分析和监控

per4j 自身带的日志分析功能结合 Google Chart API 是个不错的可视化方案;可是有 GFW,服务器部署是个问题,难道自己搭建代理吗?境外服务器?

per4j的日志格式简单的分析之后,我决定自己在per4j的基础上做一个监视系统。大致思路是:

  • 自定义一个log4jappender,将per4j日志内容按记录点的名字开始时间消耗时长写入到一张日志表;
  • 然后使用 nodejs 搭建一个简单的提供统计接口的服务,并使用chartjs 展示统计结果;

具体的步骤如下:

创建日志表

我这里使用提 MySQL数据库,考虑到日志记录的效率,我将日志表的存储引擎设置为 ARCHIVE;因日志量庞大,我将日志按日期写入31张表(一个月最多有31天);

CREATE TABLE `performance_log_{1-31}` (  
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `tag` varchar(32) NOT NULL,
  `start` bigint(20) NOT NULL,
  `duration` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=ARCHIVE;
解析per4j的日志格式

per4j的日志格式是这样的:

start[开始时间戳] time[消耗时长] tag[记录点] message[消息]  

使用正则方式匹配出开始时间戳消耗市场记录点三个数据

public class Lite {

    private String name;
    private long start;
    private int duration;

    private static final Pattern pattern = Pattern.compile("^start\\["+ "(\\d+)" +"\\]\\s+time\\["+ "(\\d+)" +"\\]\\s+tag\\["+ "(\\w+)" +"\\]");
………
………
    public static Lite from(String message) {
        Matcher matcher = pattern.matcher(message);
        if(matcher.find()) {
            Long start = Long.parseLong(matcher.group(1));
            int duration = Integer.parseInt(matcher.group(2));
            String name = matcher.group(3);
            return new Lite(name, start, duration);
        } else {
            return null;
        }
    }
自定义一个log4j的 database appender
package pfm.log4j;

import org.apache.commons.dbcp2.BasicDataSource;  
import org.apache.log4j.AppenderSkeleton;  
import org.apache.log4j.Logger;  
import org.apache.log4j.spi.LoggingEvent;

import javax.sql.DataSource;  
import java.sql.Connection;  
import java.sql.PreparedStatement;  
import java.sql.SQLException;  
import java.util.*;

public class DailyDBAppender extends AppenderSkeleton {

    private final Vector<Lite> buffer;

    private int bufferSize = 10;

    private String url;
    private String username;
    private String password;
    private String driverClass;

    private int initialSize = 5;
    private int maxTotal = 50;
    private int maxIdle = 8;
    private int maxWaitMillis = 5000;
    private int minIdle = 30000;

    private DataSource dataSource;

    private static Logger logger = Logger.getLogger(DailyDBAppender.class);

    public DailyDBAppender() {
        buffer = new Vector<>();
        Timer timer = new Timer();
        timer.schedule( new FlushTimerTask() , 10000, 3000 );
    }

    protected void append(LoggingEvent event) {

        String message = (String) event.getMessage();
        Lite lite = Lite.from(message);
        if(lite == null) {
            logger.warn("could not parse: + '"+ message +"'.");
            return ;
        }

        buffer.add(lite);
        if( buffer.size() >= bufferSize ) {
            (new Thread(this::flush)).start();
        }
    }

    private void flush() {

        if (dataSource == null) {
            dataSource = initDataSource(url,
                    driverClass,
                    username,
                    password,
                    initialSize,
                    maxTotal,
                    maxIdle,
                    maxWaitMillis,
                    minIdle);
        }
        Calendar calendar = Calendar.getInstance();
        synchronized (buffer) {
            try (Connection connection = dataSource.getConnection()) {
                int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH);
                String tableName = "performance_log_" + dayOfMonth;
                String sql = "insert into "+ tableName +" (tag, start, duration) values (?, ?, ?)";
                try (PreparedStatement statement =
                             connection.prepareStatement(sql)) {

                    int size = buffer.size();
                    Iterator<Lite> iterator = buffer.iterator();

                    while(iterator.hasNext()) {
                        Lite lite = iterator.next();
                        statement.setString(1, lite.getName());
                        statement.setLong(2, lite.getStart());
                        statement.setInt(3, lite.getDuration());

                        statement.executeUpdate();

                        iterator.remove();
                    }

                    logger.info("flush "+ size +" lites to db.\t\t\t\t\t\t\t\t\t" + buffer.size());
                }

            } catch (SQLException e) {
                e.printStackTrace();
                logger.warn("could not write log message to db. " + e.getMessage());
            }
        }

    }

    public void close() {  }

    public boolean requiresLayout() {
        return false;
    }

    //这里省略掉数据库连接的属性和连接池相关的参数的 set 方法

    private DataSource initDataSource(String url,
                                      String driverClass,
                                      String username,
                                      String password,
                                      int initialSize,
                                      int maxTotal,
                                      int maxIdle,
                                      int maxWaitMillis,
                                      int minIdle) {
        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setDriverClassName(driverClass);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        dataSource.setUrl(url);
        dataSource.setInitialSize(initialSize);
        dataSource.setMaxTotal(maxTotal);
        dataSource.setMaxIdle(maxIdle);
        dataSource.setMaxWaitMillis(maxWaitMillis);
        dataSource.setMinIdle(minIdle);
        return dataSource;
    }

    private final class FlushTimerTask extends TimerTask {

        @Override
        public void run() {
            logger.debug("start to flush lites to db.");
            if(buffer.size() > 0) {
                flush();
            }
        }
    }
}

使用了commons-dbcp2数据库连接池;

增加配置到log4j.xml文件中
<appender name="dailydb" class="pfm.log4j.DailyDBAppender">  
        <param name="username" value="username" />
        <param name="password" value="password" />
        <param name="url" value="jdbc:mysql://localhost:3306/pfmlogdb" />
        <param name="driverClass" value="com.mysql.jdbc.Driver" />
</appender>  
<logger name="org.perf4j.TimingLogger" additivity="false">  
    <level value="INFO" />
    <appender-ref ref="dailydb" />
</logger>  
per4j注解嵌入到记录点

我的要监视的接口程序是使用springmvc编写的,将记录点加到Controller层;

@Controller
public class SignController {

    @Profiled(tag = "login")
    @RequestMapping(value = "/login", method = RequestMethod.PUT)
    @ResponseBody
    public Response login(@RequestBody LoginRequest request) {
        ……
        ……
    }

    @Profiled(tag = "register")
    @RequestMapping(value = "/register", method = RequestMethod.POST)
    @ResponseBody
    public Response register(@RequestBody RegisterRequest request) {
        ……
        ……
    }

另外要将per4j的日志切面记录类以切面自动代理的方式配置到spring容器中;applicationContext.xml文件中增加下面的配置:

<aop:aspectj-autoproxy>  
    <aop:include name="timingAspect" />
</aop:aspectj-autoproxy>  
<bean id="timingAspect" class="org.perf4j.slf4j.aop.TimingAspect" />  

将接口程序运行起来后看到如下日志:

[2016-04-01 02:07:54,722] [INFO ] [Timer-0] pfm.log4j.DailyDBAppender - flush 3 lites to db.                                    0
[2016-04-01 02:08:00,715] [INFO ] [Timer-0] pfm.log4j.DailyDBAppender - flush 1 lites to db.
……

至此,即说明日志已经成功写入到数据库中;

mysql> select * from performance_log_1 limit 3;  
+----+-------------------+---------------+----------+
| id | tag               | start         | duration |
+----+-------------------+---------------+----------+
|  1 | login   | 1459440022770 |       31 |
|  2 | get_snapshot | 1459440022871 |       40 |
|  3 | get_snapshot     | 1459440022932 |       23 |
+----+-------------------+---------------+----------+
3 rows in set (0.01 sec)  

http://www.infoq.com/cn/articles/perf4j/

https://github.com/manbusky/log4j-daily-appender

后续(B)篇继续接口访问频次的可视化方案的实现;

漫步

A lazy programmer!