diff --git a/packages/hook/src/patch/HttpServer.ts b/packages/hook/src/patch/HttpServer.ts index 1c5e8d18..bd3312d1 100644 --- a/packages/hook/src/patch/HttpServer.ts +++ b/packages/hook/src/patch/HttpServer.ts @@ -1,12 +1,13 @@ 'use strict'; -import { Patcher, getRandom64 } from 'pandora-metrics'; +import { getRandom64, MetricLevel, MetricName, MetricsClientUtil, Patcher } from 'pandora-metrics'; import { extractPath } from '../utils/Utils'; import { HEADER_TRACE_ID } from '../utils/Constants'; import { parse as parseUrl } from 'url'; import { parse as parseQS, ParsedUrlQuery } from 'querystring'; import * as http from 'http'; import { IncomingMessage } from 'http'; +import parseInt = require('lodash/fp/parseInt'); const debug = require('debug')('Pandora:Hook:HttpServerPatcher'); @@ -14,6 +15,22 @@ export type bufferTransformer = (buffer, req?: IncomingMessage) => object | stri export type requestFilter = (req) => boolean; + +const MetricsStat = { + /** HTTP */ + HTTP_REQUEST: 'middleware.http.request', + + HTTP_REQUEST_PATH: 'middleware.http.request.path', + + HTTP_GROUP: 'http', + + HTTP_PATH: 'path', + + HTTP_ERROR_CODE: 400, + + HTTP_ILLEGAL_PATH: 'illegal_path' +}; + export class HttpServerPatcher extends Patcher { constructor(options?: { @@ -37,7 +54,7 @@ export class HttpServerPatcher extends Patcher { } getTraceId(req) { - return req.headers[HEADER_TRACE_ID] || getRandom64(); + return req.headers[ HEADER_TRACE_ID ] || getRandom64(); } createSpan(tracer, tags) { @@ -83,7 +100,8 @@ export class HttpServerPatcher extends Patcher { return false; } - _beforeExecute(tracer, req, res) {} + _beforeExecute(tracer, req, res) { + } beforeFinish(span, res) { span.setTag('http.status_code', { @@ -124,7 +142,7 @@ export class HttpServerPatcher extends Patcher { getFullUrl(req: IncomingMessage): string { if (!req) return ''; - const secure = (req.connection).encrypted || req.headers['x-forwarded-proto'] === 'https'; + const secure = (req.connection).encrypted || req.headers[ 'x-forwarded-proto' ] === 'https'; return 'http' + (secure ? 's' : '') + '://' + req.headers.host + @@ -141,7 +159,7 @@ export class HttpServerPatcher extends Patcher { return function wrappedCreateServer(this: any, requestListener) { if (requestListener) { - const listener = traceManager.bind(function(req, res) { + const listener = traceManager.bind(function (req, res) { const requestFilter = options.requestFilter || self.requestFilter; if (requestFilter(req)) { @@ -179,7 +197,7 @@ export class HttpServerPatcher extends Patcher { return function wrappedRequestEmit(this: IncomingMessage, event) { if (event === 'data') { - const chunk = arguments[1] || []; + const chunk = arguments[ 1 ] || []; chunks.push(chunk); } @@ -189,7 +207,7 @@ export class HttpServerPatcher extends Patcher { }); } - tracer.named(`HTTP-${tags['http.method'].value}:${tags['http.url'].value}`); + tracer.named(`HTTP-${tags[ 'http.method' ].value}:${tags[ 'http.url' ].value}`); tracer.setCurrentSpan(span); function onFinishedFactory(eventName) { @@ -236,6 +254,29 @@ export class HttpServerPatcher extends Patcher { } afterFinish(span, res) { - // overwrite + this.reportMetrics({ + rt: span.duration, + resultCode: res.statusCode + }); + } + + reportMetrics(ctx) { + let responseCode = ctx.resultCode; + if (!responseCode) { + return; + } + + let global = new MetricName(MetricsStat.HTTP_REQUEST, {}, MetricLevel.NORMAL); + + let client = MetricsClientUtil.getMetricsClient(); + + let globalCompass = client.getFastCompass(MetricsStat.HTTP_GROUP, global); + + if (MetricsStat.HTTP_ERROR_CODE > parseInt(responseCode)) { + globalCompass.record(ctx.rt, 'success'); + } else { + globalCompass.record(ctx.rt, 'error'); + } + } } diff --git a/packages/metrics/src/MetricsClient.ts b/packages/metrics/src/MetricsClient.ts index 6d433839..61928574 100644 --- a/packages/metrics/src/MetricsClient.ts +++ b/packages/metrics/src/MetricsClient.ts @@ -1,12 +1,10 @@ -import {Metric, MetricName, MetricsRegistry, MetricType} from './common/index'; -import {ProxyCreateMessage, ProxyUpdateMessage} from './domain'; -import {MetricsConstants} from './MetricsConstants'; -import {EnvironmentUtil, Environment} from 'pandora-env'; -import {MetricsMessengerClient} from './util/MessengerUtil'; -import {Proxiable, Gauge, Counter, Histogram, Meter, Timer} from './client/index'; -import {AbstractIndicator} from './indicator/AbstractIndicator'; -import {MetricSet} from './common'; -import {IMetricsRegistry} from './common/MetricsRegistry'; +import { IMetricsRegistry, Metric, MetricName, MetricSet, MetricsRegistry, MetricType } from './common'; +import { ProxyCreateMessage, ProxyUpdateMessage } from './domain'; +import { MetricsConstants } from './MetricsConstants'; +import { Environment, EnvironmentUtil } from 'pandora-env'; +import { MetricsMessengerClient } from './util/MessengerUtil'; +import { Counter, FastCompass, Gauge, Histogram, Meter, Proxiable, Timer } from './client'; +import { AbstractIndicator } from './indicator/AbstractIndicator'; export class MetricsClient extends AbstractIndicator { @@ -62,13 +60,13 @@ export class MetricsClient extends AbstractIndicator { * @param {} name * @param {Proxiable} metric */ - register(group: string, name: MetricName | string, metric: Proxiable | Metric ) { + register(group: string, name: MetricName | string, metric: Proxiable | Metric) { this.debug(`Register: wait register a metrics name = ${name}`); let newName = this.buildName(name); // 把应用名加上 newName = newName.tagged('appName', this.getAppName()); - if(!metric.type) { + if (!metric.type) { metric.type = MetricType.GAUGE; } @@ -78,7 +76,7 @@ export class MetricsClient extends AbstractIndicator { // Gauge 比较特殊,是实际的类,而服务端才是一个代理,和其他 metric 都不同,不需要 proxy if ((metric).proxyMethod && (metric).proxyMethod.length) { for (let method of (metric).proxyMethod) { - metric[method] = (...args) => { + metric[ method ] = (...args) => { this.debug(`Invoke: invoke name = ${newName.getNameKey()}, type = ${metric.type}, method = ${method}, value = ${args}`); this.report({ action: MetricsConstants.EVT_METRIC_UPDATE, @@ -128,7 +126,7 @@ export class MetricsClient extends AbstractIndicator { }) { this.debug(`Invoke: invoked, key = ${args.metricKey} `); let metric = this.allMetricsRegistry.getMetric(MetricName.parseKey(args.metricKey)); - if(metric && metric.type === args.type) { + if (metric && metric.type === args.type) { return await Promise.resolve((>metric).getValue()); } else { this.debug(`Invoke: can not find metric(${args.metricKey}) or type different`); @@ -140,7 +138,7 @@ export class MetricsClient extends AbstractIndicator { */ protected registerDownlink() { this.debug(`Register: down link eventKey = ${this.getClientDownlinkKey()}`); - this.messengerClient.query(this.getClientDownlinkKey(), async(message, reply) => { + this.messengerClient.query(this.getClientDownlinkKey(), async (message, reply) => { try { reply && reply(await this.invoke(message)); } catch (err) { @@ -179,8 +177,14 @@ export class MetricsClient extends AbstractIndicator { return histogram; } + getFastCompass(group: string, name: MetricName | string) { + const fastCompass = new FastCompass(); + this.register(group, name, fastCompass); + return fastCompass; + } + private buildName(name: MetricName | string): MetricName { - if(typeof name === 'string') { + if (typeof name === 'string') { name = MetricName.build(name); } diff --git a/packages/metrics/src/MetricsServerManager.ts b/packages/metrics/src/MetricsServerManager.ts index 4c1cd12d..8344f03c 100644 --- a/packages/metrics/src/MetricsServerManager.ts +++ b/packages/metrics/src/MetricsServerManager.ts @@ -1,22 +1,24 @@ -import {MessengerClient} from 'pandora-messenger'; -import {MetricsMessengerServer} from './util/MessengerUtil'; -import {MetricsConstants} from './MetricsConstants'; +import { MessengerClient } from 'pandora-messenger'; +import { MetricsMessengerServer } from './util/MessengerUtil'; +import { MetricsConstants } from './MetricsConstants'; import { - MetricsRegistry, + BaseGauge, + ICounter, Metric, MetricFilter, + MetricName, MetricsManager, + MetricsRegistry, MetricType, - MetricName, - BaseGauge, - ICounter, } from './common/index'; -import {ProxyCreateMessage, ProxyUpdateMessage} from './domain'; -import {AbstractIndicator} from './indicator/AbstractIndicator'; -import {IMeter} from './common/metrics/Meter'; -import {IHistogram} from './common/metrics/Histogram'; -import {ITimer} from './common/metrics/Timer'; -import {IMetricsRegistry} from './common/MetricsRegistry'; +import { ProxyCreateMessage, ProxyUpdateMessage } from './domain'; +import { AbstractIndicator } from './indicator/AbstractIndicator'; +import { IMeter } from './common/metrics/Meter'; +import { IHistogram } from './common/metrics/Histogram'; +import { ITimer } from './common/metrics/Timer'; +import { IMetricsRegistry } from './common/MetricsRegistry'; +import { IFastCompass } from './common/metrics/FastCompass'; + const util = require('util'); export class MetricsServerManager extends AbstractIndicator implements MetricsManager { @@ -44,7 +46,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa static instance; static getInstance() { - if(!this.instance) { + if (!this.instance) { this.instance = new MetricsServerManager(); } return this.instance; @@ -86,9 +88,9 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa protected buildReportLink(client) { // 处理接受的数据 client.on(this.getClientUplinkKey(client._APP_NAME, client._CLIENT_ID), (data) => { - if(data.action === MetricsConstants.EVT_METRIC_CREATE) { + if (data.action === MetricsConstants.EVT_METRIC_CREATE) { this.registerMetric(data); - } else if(data.action === MetricsConstants.EVT_METRIC_UPDATE) { + } else if (data.action === MetricsConstants.EVT_METRIC_UPDATE) { this.updateMetric(data); } }); @@ -106,7 +108,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa let storeMetricsArr = this.metricsAndClientMap.get(remove_id); this.removeMetricInRegistry(this.allMetricsRegistry, storeMetricsArr); // remove in group map - for(let registry of this.metricRegistryMap.values()) { + for (let registry of this.metricRegistryMap.values()) { this.removeMetricInRegistry(registry, storeMetricsArr); } // remove from metricsAndClientMap @@ -115,7 +117,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa } private removeMetricInRegistry(registry, storeMetricsArr) { - for(let key of (storeMetricsArr || [])) { + for (let key of (storeMetricsArr || [])) { registry.remove(key); } } @@ -130,7 +132,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa let metric; let metricName = MetricName.parseKey(data.name); - if(!this.metricsAndClientMap.has(data.clientId)) { + if (!this.metricsAndClientMap.has(data.clientId)) { this.metricsAndClientMap.set(data.clientId, []); } @@ -138,7 +140,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa let storeMetricsArr = this.metricsAndClientMap.get(data.clientId); storeMetricsArr.push(data.name); - switch(data.type) { + switch (data.type) { case 'GAUGE': metric = this.createGaugeProxy(metricName); break; @@ -154,6 +156,9 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa case 'HISTOGRAM': metric = this.allMetricsRegistry.histogram(metricName); break; + case 'FASTCOMPASS': + metric = this.allMetricsRegistry.fastCompass(metricName); + break; default: metric = this.createGaugeProxy(metricName); } @@ -176,11 +181,11 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa // 找指标 let metricName = MetricName.parseKey(data.name); let metric = this.allMetricsRegistry.getMetric(metricName); - if(metric) { + if (metric) { this.debug(`Invoke: find metric(${data.name}), type = ${metric.type}`); - if(metric.type === data.type) { + if (metric.type === data.type) { this.debug(`Invoke: type equal and call ${data.method}(${data.value})`); - metric[data.method].apply(metric, data.value); + metric[ data.method ].apply(metric, data.value); } } else { this.debug(`Invoke: can't find msetric(${data.name})`); @@ -216,7 +221,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa * @returns {Promise} */ async invoke(args?: any) { - for(let client of this.getClients()) { + for (let client of this.getClients()) { let result = await new Promise((resolve) => { this.debug(`Invoke: eventKey(${this.getClientDownlinkKey((client)._APP_NAME, (client)._CLIENT_ID)}), args = ${args}`); client.send(this.getClientDownlinkKey((client)._APP_NAME, (client)._CLIENT_ID), args, (err, result) => { @@ -225,7 +230,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa }, MetricsConstants.CLIENT_TIME_OUT); }); - if(result !== null && result !== undefined) { + if (result !== null && result !== undefined) { return result; } } @@ -249,7 +254,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa destroy() { this.enabled = false; - for(let client of this.metricsClients.values()) { + for (let client of this.metricsClients.values()) { client.close(); } } @@ -260,7 +265,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa } let newName; - if(typeof name === 'string') { + if (typeof name === 'string') { newName = MetricName.build(name); } else { newName = name; @@ -310,6 +315,11 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa return metricRegistry.getTimers(filter); } + getFastCompasses(group: string, filter: MetricFilter = MetricFilter.ALL) { + let metricRegistry = this.getMetricRegistryByGroup(group); + return metricRegistry.getFastCompasses(filter); + } + getMetrics(group: string): Map { let metricRegistry: IMetricsRegistry = this.metricRegistryMap.get(group); if (metricRegistry) { @@ -320,7 +330,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa getCategoryMetrics(group: string, filter: MetricFilter = MetricFilter.ALL): Map> { const metricRegistry = this.metricRegistryMap.get(group); - const result: Map> = new Map(); + const result: Map> = new Map(); result.set(MetricType.GAUGE, metricRegistry.getGauges(filter)); result.set(MetricType.COUNTER, metricRegistry.getCounters(filter)); @@ -332,7 +342,7 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa } getAllCategoryMetrics(filter: MetricFilter = MetricFilter.ALL): Map> { - const result: Map> = new Map(); + const result: Map> = new Map(); const allMetricsRegistry = this.getAllMetricsRegistry(); result.set(MetricType.GAUGE, allMetricsRegistry.getGauges(filter)); @@ -367,14 +377,14 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa let result = new Map(); - for (let [group, metricRegistry] of this.metricRegistryMap.entries()) { + for (let [ group, metricRegistry ] of this.metricRegistryMap.entries()) { result.set(group, metricRegistry.getMetricNames()); } return result; } getMeter(group: string, name: MetricName): IMeter { - const meter = this.getMetricRegistryByGroup(group).meter(name); + const meter = this.getMetricRegistryByGroup(group).meter(name); this.allMetricsRegistry.register(name, meter); return meter; } @@ -397,6 +407,12 @@ export class MetricsServerManager extends AbstractIndicator implements MetricsMa return timer; } + getFastCompass(group: string, name: MetricName): IFastCompass { + const fastcompass = (this.getMetricRegistryByGroup(group)).fastCompass(name); + this.allMetricsRegistry.register(name, fastcompass); + return fastcompass; + } + destory() { this.messengerServer.close(); for (let client of this.metricsClients.values()) { diff --git a/packages/metrics/src/client/MetricsManagerClient.ts b/packages/metrics/src/client/MetricsManagerClient.ts index 23fb8601..4d88c48e 100644 --- a/packages/metrics/src/client/MetricsManagerClient.ts +++ b/packages/metrics/src/client/MetricsManagerClient.ts @@ -1,5 +1,5 @@ -import {MetricName, MetricType} from '../common/index'; -import {Counter, Histogram, Meter, Timer} from './MetricsProxy'; +import {MetricName, MetricType} from '../common'; +import {Counter, Histogram, Meter, Timer, FastCompass} from './MetricsProxy'; import {MetricsProcessChannel} from './MetricsProcessChannel'; import {Proxiable} from './domain'; @@ -42,4 +42,10 @@ export class MetricsManagerClient { return histogram; } + static getFastCompass(group: string, name: MetricName | string) { + const fastCompass = new FastCompass(); + this.register(group, name, fastCompass); + return fastCompass; + } + } diff --git a/packages/metrics/src/client/MetricsProxy.ts b/packages/metrics/src/client/MetricsProxy.ts index cafb27e4..187f4c0c 100644 --- a/packages/metrics/src/client/MetricsProxy.ts +++ b/packages/metrics/src/client/MetricsProxy.ts @@ -44,3 +44,13 @@ export class Meter implements Metric, Proxiable { } } + + +export class FastCompass implements Metric, Proxiable { + type = MetricType.FASTCOMPASS; + proxyMethod = ['record']; + + record(duration, subCategory) { + + } +} diff --git a/packages/metrics/src/collect/CompactMetricsCollector.ts b/packages/metrics/src/collect/CompactMetricsCollector.ts index 47722da8..0595c182 100644 --- a/packages/metrics/src/collect/CompactMetricsCollector.ts +++ b/packages/metrics/src/collect/CompactMetricsCollector.ts @@ -1,5 +1,5 @@ import {MetricsCollector} from './MetricsCollector'; -import {BucketCounter, MetricName} from '../common/index'; +import { BucketCounter, IFastCompass, MetricName } from '../common/index'; import {MetricObject} from './MetricObject'; import {ITimer} from '../common/metrics/Timer'; import {IHistogram} from '../common/metrics/Histogram'; @@ -69,4 +69,53 @@ export class CompactMetricsCollector extends MetricsCollector { .addMetricWithSuffix(name, 'm1', this.convertRate(meter.getOneMinuteRate()), timestamp); } + collectFastCompass(name: MetricName, fastCompass: IFastCompass, timestamp: number) { + let bucketInterval = fastCompass.getBucketInterval(); + + let start = this.getNormalizedStartTime(timestamp, bucketInterval); + let totalCount = 0; + let totalRt = 0; + let successCount = 0; + let hitCount = -1; + + let countPerCategory = fastCompass.getMethodCountPerCategory(start); + for (let [key, value] of countPerCategory.entries()) { + if (value.has(start)) { + this.addMetricWithSuffix(name, key + '_bucket_count', value.get(start), start, + MetricObject.MetricType.DELTA, bucketInterval); + totalCount += value.get(start); + if ('success' === key) { + successCount += value.get(start); + } + if ('hit' === key) { + hitCount = value.get(start); + successCount += value.get(start); + } + } else { + this.addMetricWithSuffix(name, key + '_bucket_count', 0, start, + MetricObject.MetricType.DELTA, bucketInterval); + } + } + + for (let value of fastCompass.getMethodRtPerCategory(start).values()) { + if (value.has(start)) { + totalRt += value.get(start); + } + } + this.addMetricWithSuffix(name, 'bucket_count', totalCount, start, + MetricObject.MetricType.DELTA, bucketInterval); + this.addMetricWithSuffix(name, 'bucket_sum', totalRt, start, + MetricObject.MetricType.DELTA, bucketInterval); + this.addMetricWithSuffix(name, 'qps', this.rate(totalCount, bucketInterval), start, + MetricObject.MetricType.GAUGE, bucketInterval); + this.addMetricWithSuffix(name, 'rt', this.rate(totalRt, totalCount), start, + MetricObject.MetricType.GAUGE, bucketInterval); + this.addMetricWithSuffix(name, 'success_rate', this.ratio(successCount, totalCount), start, + MetricObject.MetricType.GAUGE, bucketInterval); + if (hitCount >= 0) { + this.addMetricWithSuffix(name, 'hit_rate', this.ratio(hitCount, successCount), start, + MetricObject.MetricType.GAUGE, bucketInterval); + } + } + } diff --git a/packages/metrics/src/collect/NormalMetricsCollector.ts b/packages/metrics/src/collect/NormalMetricsCollector.ts index efd5703b..7b4c80e4 100644 --- a/packages/metrics/src/collect/NormalMetricsCollector.ts +++ b/packages/metrics/src/collect/NormalMetricsCollector.ts @@ -1,8 +1,6 @@ -import {MetricsCollector} from './MetricsCollector'; -import {MetricName, ITimer, IHistogram, ICounter, IMeter} from '../common/index'; -import {MetricObject} from './MetricObject'; -import {BucketCounter} from '../common'; -import {Snapshot} from '../common/domain'; +import { MetricsCollector } from './MetricsCollector'; +import { BucketCounter, ICounter, IFastCompass, IHistogram, IMeter, ITimer, MetricName, Snapshot } from '../common'; +import { MetricObject } from './MetricObject'; export class NormalMetricsCollector extends MetricsCollector { @@ -65,4 +63,53 @@ export class NormalMetricsCollector extends MetricsCollector { // instant count this.addInstantCountMetric(meter.getInstantCount(), name, meter.getInstantCountInterval(), timestamp); } + + collectFastCompass(name: MetricName, fastCompass: IFastCompass, timestamp: number) { + let bucketInterval = fastCompass.getBucketInterval(); + + let start = this.getNormalizedStartTime(timestamp, bucketInterval); + let totalCount = 0; + let totalRt = 0; + let successCount = 0; + let hitCount = -1; + + let countPerCategory = fastCompass.getMethodCountPerCategory(start); + for (let [ key, value ] of countPerCategory.entries()) { + if (value.has(start)) { + this.addMetricWithSuffix(name, key + '_bucket_count', value.get(start), start, + MetricObject.MetricType.DELTA, bucketInterval); + totalCount += value.get(start); + if ('success' === key) { + successCount += value.get(start); + } + if ('hit' === key) { + hitCount = value.get(start); + successCount += value.get(start); + } + } else { + this.addMetricWithSuffix(name, key + '_bucket_count', 0, start, + MetricObject.MetricType.DELTA, bucketInterval); + } + } + + for (let value of fastCompass.getMethodRtPerCategory(start).values()) { + if (value.has(start)) { + totalRt += value.get(start); + } + } + this.addMetricWithSuffix(name, 'bucket_count', totalCount, start, + MetricObject.MetricType.DELTA, bucketInterval); + this.addMetricWithSuffix(name, 'bucket_sum', totalRt, start, + MetricObject.MetricType.DELTA, bucketInterval); + this.addMetricWithSuffix(name, 'qps', this.rate(totalCount, bucketInterval), start, + MetricObject.MetricType.GAUGE, bucketInterval); + this.addMetricWithSuffix(name, 'rt', this.rate(totalRt, totalCount), start, + MetricObject.MetricType.GAUGE, bucketInterval); + this.addMetricWithSuffix(name, 'success_rate', this.ratio(successCount, totalCount), start, + MetricObject.MetricType.GAUGE, bucketInterval); + if (hitCount >= 0) { + this.addMetricWithSuffix(name, 'hit_rate', this.ratio(hitCount, successCount), start, + MetricObject.MetricType.GAUGE, bucketInterval); + } + } } diff --git a/packages/metrics/src/common/MetricBuilder.ts b/packages/metrics/src/common/MetricBuilder.ts index 0cf7d034..f7341bc1 100644 --- a/packages/metrics/src/common/MetricBuilder.ts +++ b/packages/metrics/src/common/MetricBuilder.ts @@ -7,6 +7,7 @@ import {BaseTimer} from './metrics/Timer'; import {BucketCounter} from './metrics/BucketCounter'; import {ReservoirType} from './Reservoir'; import {Metric} from './domain'; +import { BaseFastCompass } from './metrics/FastCompass'; export class MetricBuilder { @@ -52,4 +53,13 @@ export class MetricBuilder { } }; + static FASTCOMPASSES = { + newMetric(name: MetricName) { + return new BaseFastCompass(MetricBuilder.config.period(name.getMetricLevel())); + }, + isInstance(metric: Metric) { + return metric instanceof BaseFastCompass; + } + }; + } diff --git a/packages/metrics/src/common/MetricType.ts b/packages/metrics/src/common/MetricType.ts index 74e7786c..72ca2de4 100644 --- a/packages/metrics/src/common/MetricType.ts +++ b/packages/metrics/src/common/MetricType.ts @@ -5,4 +5,5 @@ export let MetricType = { HISTOGRAM: 'HISTOGRAM', METER: 'METER', TIMER: 'TIMER', + FASTCOMPASS: 'FASTCOMPASS' }; diff --git a/packages/metrics/src/common/MetricsManager.ts b/packages/metrics/src/common/MetricsManager.ts index 95d87e22..aa0e1152 100644 --- a/packages/metrics/src/common/MetricsManager.ts +++ b/packages/metrics/src/common/MetricsManager.ts @@ -7,6 +7,7 @@ import {IMetricsRegistry} from './MetricsRegistry'; import {MetricFilter} from './MetricFilter'; import {BaseGauge} from './metrics/Gauge'; import {Metric} from './domain'; +import { IFastCompass } from './metrics/FastCompass'; export interface MetricsManager { /** @@ -49,6 +50,8 @@ export interface MetricsManager { */ getTimer(group: string, name: MetricName): ITimer; + getFastCompasses(group: string, filter?: MetricFilter): Map; + /** * Register a customized metric to specified group. * @param group: the group name of MetricRegistry @@ -97,6 +100,15 @@ export interface MetricsManager { getTimers(group: string, filter?: MetricFilter): Map; + /** + * Create a {@link BaseFastCompass} metric in give group, name, and type + * if not exist, an instance will be created. + * @param group the group of MetricRegistry + * @param name the name of the metric + * @return an instance of {@link BaseFastCompass} + */ + getFastCompass(group: string, name: MetricName): IFastCompass; + /** * A map of metric names to metrics. * diff --git a/packages/metrics/src/common/MetricsRegistry.ts b/packages/metrics/src/common/MetricsRegistry.ts index 79e06241..ba34bde6 100644 --- a/packages/metrics/src/common/MetricsRegistry.ts +++ b/packages/metrics/src/common/MetricsRegistry.ts @@ -1,33 +1,54 @@ -import {MetricName} from './MetricName'; -import {MetricSet} from './MetricSet'; -import {BaseGauge} from './metrics/Gauge'; -import {ICounter} from './metrics/Counter'; -import {IHistogram} from './metrics/Histogram'; -import {IMeter} from './metrics/Meter'; -import {ITimer} from './metrics/Timer'; -import {MetricBuilder} from './MetricBuilder'; -import {MetricFilter} from './MetricFilter'; -import {MetricType} from './MetricType'; -import {ReservoirType} from './Reservoir'; -import {Metric} from './domain'; +import { MetricName } from './MetricName'; +import { MetricSet } from './MetricSet'; +import { BaseGauge } from './metrics/Gauge'; +import { ICounter } from './metrics/Counter'; +import { IHistogram } from './metrics/Histogram'; +import { IMeter } from './metrics/Meter'; +import { ITimer } from './metrics/Timer'; +import { MetricBuilder } from './MetricBuilder'; +import { MetricFilter } from './MetricFilter'; +import { MetricType } from './MetricType'; +import { ReservoirType } from './Reservoir'; +import { Metric } from './domain'; +import { IFastCompass } from './metrics/FastCompass'; + const debug = require('debug')('pandora:metrics-common:registry'); export interface IMetricsRegistry { register(name: MetricName, metric: Metric): Metric; + registerAll(prefix, metrics?: MetricSet); + getKeys(); + counter(name: MetricName): ICounter; + histogram(name: MetricName): IHistogram; + meter(name: MetricName): IMeter; + timer(name: MetricName): ITimer; + + fastCompass(name: MetricName): IFastCompass; + getMetric(name: MetricName): Metric; + getMetrics(metricType?: string, filter?: MetricFilter); + getGauges(filter?: MetricFilter): Map>; + getCounters(filter?: MetricFilter): Map; + getHistograms(filter?: MetricFilter): Map; + getMeters(filter?: MetricFilter): Map; + getTimers(filter?: MetricFilter): Map; + + getFastCompasses(filter?: MetricFilter): Map; + getMetricNames(); + remove(metricsKey: string); } @@ -46,7 +67,7 @@ export class MetricsRegistry implements IMetricsRegistry { this.registerAll(name, metric); } else { debug('------------> metrics is normal:', name.getNameKey()); - if(!metric.type) { + if (!metric.type) { metric.type = MetricType.GAUGE; } @@ -63,8 +84,8 @@ export class MetricsRegistry implements IMetricsRegistry { metrics = prefix; prefix = MetricName.EMPTY; } - for (let {name, metric} of metrics.getMetrics()) { - if(typeof name === 'string') { + for (let { name, metric } of metrics.getMetrics()) { + if (typeof name === 'string') { name = MetricName.parseKey(name); } @@ -96,8 +117,12 @@ export class MetricsRegistry implements IMetricsRegistry { return this.getOrAdd(name, MetricBuilder.TIMERS); } + fastCompass(name: MetricName): IFastCompass { + return this.getOrAdd(name, MetricBuilder.FASTCOMPASSES); + } + protected getOrAdd(name: MetricName, builder, type?: ReservoirType) { - if(this.metricsSet.has(name.getNameKey())) { + if (this.metricsSet.has(name.getNameKey())) { return this.metricsSet.get(name.getNameKey()).metric; } else { // add @@ -109,7 +134,7 @@ export class MetricsRegistry implements IMetricsRegistry { const key = name.getNameKey(); debug(`find metric in registry name = ${key}, metrics num = ${this.metricsSet.size}`); - if(this.metricsSet.has(key)) { + if (this.metricsSet.has(key)) { return this.metricsSet.get(key).metric; } else { return null; @@ -118,19 +143,19 @@ export class MetricsRegistry implements IMetricsRegistry { getMetrics(metricType?: string, filter?: MetricFilter) { const filterMap: Map = new Map(); - if(!filter) { + if (!filter) { filter = MetricFilter.ALL; } - if(metricType) { - for(let [key, value] of this.metricsSet.entries()) { + if (metricType) { + for (let [ key, value ] of this.metricsSet.entries()) { debug(key + ' => ' + value.metric.type, metricType); - if(value.metric.type === metricType && filter.matches(value.name, value.metric)) { + if (value.metric.type === metricType && filter.matches(value.name, value.metric)) { filterMap.set(key, value.metric); } } } else { - for(let [key, value] of this.metricsSet.entries()) { + for (let [ key, value ] of this.metricsSet.entries()) { filterMap.set(key, value.metric); } } @@ -158,9 +183,13 @@ export class MetricsRegistry implements IMetricsRegistry { return > this.getMetrics(MetricType.TIMER, filter); } + getFastCompasses(filter?: MetricFilter): Map { + return > this.getMetrics(MetricType.FASTCOMPASS, filter); + } + getMetricNames() { let names = []; - for(let metricObject of this.metricsSet.values()) { + for (let metricObject of this.metricsSet.values()) { names.push(metricObject.name); } return names; diff --git a/packages/metrics/src/common/index.ts b/packages/metrics/src/common/index.ts index 349e71b0..23360495 100644 --- a/packages/metrics/src/common/index.ts +++ b/packages/metrics/src/common/index.ts @@ -8,6 +8,7 @@ export * from './metrics/Gauge'; export * from './metrics/Histogram'; export * from './metrics/Meter'; export * from './metrics/Timer'; +export * from './metrics/FastCompass'; export * from './MetricName'; export * from './MetricSet'; export * from './MetricsRegistry'; diff --git a/packages/metrics/src/common/metrics/FastCompass.ts b/packages/metrics/src/common/metrics/FastCompass.ts new file mode 100644 index 00000000..4ad2a157 --- /dev/null +++ b/packages/metrics/src/common/metrics/FastCompass.ts @@ -0,0 +1,154 @@ +import { MetricType } from '../MetricType'; +import { Metric } from '../domain'; +import { BucketCounter } from './BucketCounter'; + +export interface IFastCompass extends Metric { + + /** + * 记录一次方法调用的RT和子类别,子类别应当是正交的,不能有重叠 + * 例如 成功/失败 + * record a method invocation with execution time and sub-categories + * @param duration must be milliseconds + * @param subCategory all the sub-categories should be orthogonal, + * which will be added up to the total number of method invocations + */ + record(duration, subCategory); + + + /** + * 对于每个子类别,返回每个统计间隔的方法调用总次数 + * return method count per bucket per category + * @return + */ + getMethodCountPerCategory(startTime?): Map>; + + + /** + * 对于每个子类别,返回每个统计间隔的执行总时间和次数,按位分离操作放到下一层进行 + * return method execution time and count per bucket per category + * @return + */ + getMethodRtPerCategory(startTime?): Map>; + + /** + * 对于每个子类别,返回每个统计间隔的执行总时间和次数,按位分离操作放到下一层进行 + * return method execution time and count per bucket per category + * @return + */ + getCountAndRtPerCategory(startTime): Map>; + + /** + * 获取统计间隔 + * @return the bucket interval + */ + getBucketInterval(); +} + + +/** + * 控制总次数的bit数, 理论统计上限为 2 ^ (64 -38 -1) = 33554432 + * This magic number divide a long into two parts, + * where the higher part is used to record the total number of method invocations, + * and the lower part is used to record the total method execution time. + * The max number of count per collecting interval will be 2 ^ (64 -38 -1) = 33554432 + */ +const COUNT_OFFSET = 38; + +/** + * 次数统计的累加基数,和rt相加得到实际更新到LongAdder的数 + * The base number of count that is added to total rt, + * to derive a number which will be added to {@link LongAdder} + */ +const COUNT_BASE = 1 << 38; + +/** + * 总数和此数进行二进制与得到总rt统计 + * The base number is used to do BITWISE AND operation with the value of {@link LongAdder} + * to derive the total number of execution time + */ +const RT_BITWISE_AND_BASE = (1 << 38) - 1; + +const MAX_SUBCATEGORY_SIZE = 20; + +const DEFAULT_BUCKET_COUNT = 10; + + +/** + * 通过1个LongAdder来同时完成count和rt的累加操作 + * Java里面1个Long有64个bit, 除去最高位表示符号的1个bit,还有63个bit可以使用 + * 在一个不超过60s统计周期内,方法调用的总次数和总次数其实完全用不到63个bit + * 所以可以将这两个统计项放到一个long里面来表示 + * 这里高位的25个bit表示统计周期内调用总次数,后38位表示总rt + */ +export class BaseFastCompass implements IFastCompass { + type = MetricType.FASTCOMPASS; + + bucketInterval; + numberOfBuckets; + maxCategoryCount; + subCategories: Map; + + constructor(bucketInterval, numberOfBuckets = DEFAULT_BUCKET_COUNT, maxCategoryCount = MAX_SUBCATEGORY_SIZE) { + this.bucketInterval = bucketInterval; + this.numberOfBuckets = numberOfBuckets; + this.maxCategoryCount = maxCategoryCount; + this.subCategories = new Map(); + } + + + record(duration: number, subCategory: string) { + if (duration < 0 || subCategory == null) { + return; + } + if (!this.subCategories.has(subCategory)) { + if (this.subCategories.size >= this.maxCategoryCount) { + // ignore if maxCategoryCount is exceeded, no exception will be thrown + return; + } + this.subCategories.set(subCategory, new BucketCounter(this.bucketInterval, this.numberOfBuckets, false)); + } + let data = COUNT_BASE + duration; + this.subCategories.get(subCategory).update(data); + } + + getMethodCountPerCategory(startTime = 0): Map> { + let countPerCategory: Map> = new Map(); + for (let [ key, value ] of this.subCategories.entries()) { + let bucketCount: Map = new Map(); + for (let [ innerKey, innerValue ] of value.getBucketCounts(startTime).entries()) { + bucketCount.set(innerKey, innerValue >> COUNT_OFFSET); + } + countPerCategory.set(key, bucketCount); + } + return countPerCategory; + } + + getMethodRtPerCategory(startTime = 0) { + let rtPerCategory: Map> = new Map(); + for (let [ key, value ] of this.subCategories.entries()) { + let bucketCount: Map = new Map(); + for (let [ innerKey, innerValue ] of value.getBucketCounts(startTime).entries()) { + bucketCount.set(innerKey, innerValue & RT_BITWISE_AND_BASE); + } + rtPerCategory.set(key, bucketCount); + } + + return rtPerCategory; + } + + getBucketInterval() { + return this.bucketInterval; + } + + getCountAndRtPerCategory(startTime = 0) { + let countAndRtPerCategory: Map> = new Map(); + for (let [ key, value ] of this.subCategories.entries()) { + let bucketCount: Map = new Map(); + for (let [ innerKey, innerValue ] of value.getBucketCounts(startTime).entries()) { + bucketCount.set(innerKey, innerValue); + } + countAndRtPerCategory.set(key, bucketCount); + } + return countAndRtPerCategory; + } +} diff --git a/packages/metrics/src/endpoint/impl/MetricsEndPoint.ts b/packages/metrics/src/endpoint/impl/MetricsEndPoint.ts index 56d2458b..e22e70d7 100644 --- a/packages/metrics/src/endpoint/impl/MetricsEndPoint.ts +++ b/packages/metrics/src/endpoint/impl/MetricsEndPoint.ts @@ -1,10 +1,8 @@ -import {EndPoint} from '../EndPoint'; -import {MetricsServerManager} from '../../MetricsServerManager'; -import {MetricFilter, MetricName, IMetricsRegistry, Metric} from '../../common/index'; -import {MetricObject} from '../../collect/MetricObject'; -import {MetricsManager} from '../../common/MetricsManager'; -import {NormalMetricsCollector} from '../../collect/NormalMetricsCollector'; -import {MetricsInjectionBridge} from '../../util/MetricsInjectionBridge'; +import { EndPoint } from '../EndPoint'; +import { MetricsServerManager } from '../../MetricsServerManager'; +import { IMetricsRegistry, Metric, MetricFilter, MetricName, MetricsManager } from '../../common'; +import { MetricObject, MetricsInjectionBridge, NormalMetricsCollector } from '../..'; + const debug = require('debug')('pandora:metrics:MetricEndPoint'); export class AppNameFilter implements MetricFilter { @@ -17,10 +15,10 @@ export class AppNameFilter implements MetricFilter { matches(name: MetricName, metric: Metric): boolean { let tags = name.getTags() || {}; - if(!tags['appName']) { + if (!tags[ 'appName' ]) { return true; } else { - return tags['appName'] === this.appName; + return tags[ 'appName' ] === this.appName; } } } @@ -34,20 +32,20 @@ export class MetricsEndPoint extends EndPoint { async listMetrics(group?: string, appName?: string): Promise<{}> { let filter; - if(appName) { + if (appName) { filter = new AppNameFilter(appName); } if (this.manager.isEnabled()) { let resultMap = {}; for (let groupName of this.manager.listMetricGroups()) { - if(!group || (group && groupName === group)) { + if (!group || (group && groupName === group)) { let registry = this.manager.getMetricRegistryByGroup(groupName); let results: Array = await this.buildMetricRegistry(registry, filter); - resultMap[groupName] = results.map((o) => { + resultMap[ groupName ] = results.map((o) => { let result = o.toJSON(); // list 接口过滤掉 value 和 timestamp - delete result['value']; - delete result['timestamp']; + delete result[ 'value' ]; + delete result[ 'timestamp' ]; return result; }); } @@ -76,35 +74,40 @@ export class MetricsEndPoint extends EndPoint { Array.from(registry.getGauges().keys()).forEach((key, index) => { debug(`collect gauge key = ${key}`); - collector.collectGauge(MetricName.parseKey(key), results[index], timestamp); + collector.collectGauge(MetricName.parseKey(key), results[ index ], timestamp); }); - for (let [key, counter] of registry.getCounters().entries()) { + for (let [ key, counter ] of registry.getCounters().entries()) { debug(`collect counter key = ${key}`); collector.collectCounter(MetricName.parseKey(key), counter, timestamp); } - for (let [key, histogram] of registry.getHistograms().entries()) { + for (let [ key, histogram ] of registry.getHistograms().entries()) { debug(`collect histogram key = ${key}`); collector.collectHistogram(MetricName.parseKey(key), histogram, timestamp); } - for (let [key, meter] of registry.getMeters().entries()) { + for (let [ key, meter ] of registry.getMeters().entries()) { debug(`collect meter key = ${key}`); collector.collectMeter(MetricName.parseKey(key), meter, timestamp); } - for (let [key, timer] of registry.getTimers().entries()) { + for (let [ key, timer ] of registry.getTimers().entries()) { debug(`collect timer key = ${key}`); collector.collectTimer(MetricName.parseKey(key), timer, timestamp); } + for (let [ key, fastcompass ] of registry.getFastCompasses().entries()) { + debug(`collect fastcompass key = ${key}`); + collector.collectFastCompass(MetricName.parseKey(key), fastcompass, timestamp); + } + return collector.build(); } async getMetricsByGroup(groupName: string, appName?: string): Promise> { let filter; - if(appName) { + if (appName) { filter = new AppNameFilter(appName); } let registry = this.manager.getMetricRegistryByGroup(groupName); @@ -119,7 +122,7 @@ export class MetricsEndPoint extends EndPoint { } protected getCollector() { - return this.config['collector'] || NormalMetricsCollector; + return this.config[ 'collector' ] || NormalMetricsCollector; } } diff --git a/packages/metrics/test/unit/MetricsServerManager.test.ts b/packages/metrics/test/unit/MetricsServerManager.test.ts index a90997f4..4b594785 100644 --- a/packages/metrics/test/unit/MetricsServerManager.test.ts +++ b/packages/metrics/test/unit/MetricsServerManager.test.ts @@ -1,8 +1,8 @@ import {MetricsServerManager} from '../../src/MetricsServerManager'; import {MetricsClient} from '../../src/MetricsClient'; import {expect} from 'chai'; -import {Counter as CounterProxy, Gauge as GaugeProxy, Timer as TimerProxy, Histogram as HistogramProxy, Meter as MeterProxy} from '../../src/client/index'; -import {MetricName, BaseCounter, BaseGauge, BaseHistogram, BaseMeter, BaseTimer} from '../../src/common/index'; +import {Counter as CounterProxy, Gauge as GaugeProxy, Timer as TimerProxy, Histogram as HistogramProxy, Meter as MeterProxy, FastCompass as FastCompassProxy} from '../../src/client/index'; +import {MetricName, BaseCounter, BaseGauge, BaseHistogram, BaseMeter, BaseTimer, BaseFastCompass} from '../../src/common/index'; import {MetricsConstants} from '../../src/MetricsConstants'; describe('/test/unit/MetricsServerManager.test.ts', () => { @@ -33,6 +33,7 @@ describe('/test/unit/MetricsServerManager.test.ts', () => { expect(server.getCounters('empty').size).to.equal(0); expect(server.getTimers('empty').size).to.equal(0); expect(server.getMeters('empty').size).to.equal(0); + expect(server.getFastCompasses('empty').size).to.equal(0); }); it('create a new client and register it', () => { @@ -86,6 +87,9 @@ describe('/test/unit/MetricsServerManager.test.ts', () => { let meter = new MeterProxy(); client.register('test_extra', MetricName.build('test.qps.meter'), meter); + + let fastCompass = new FastCompassProxy(); + client.register('test_extra', MetricName.build('test.qps.fastCompass'), fastCompass); }); @@ -124,8 +128,11 @@ describe('/test/unit/MetricsServerManager.test.ts', () => { const meter = server.getMeter('middleware', MetricName.build('reporter.test.meter')); expect(meter).to.be.an.instanceof(BaseMeter); + const fastCompass = server.getFastCompass('middleware', MetricName.build('reporter.test.fastCompass')); + expect(fastCompass).to.be.an.instanceof(BaseFastCompass); + expect(server.listMetricNamesByGroup().size > 0).to.be.true; - expect(server.listMetricNamesByGroup().get('middleware').length).to.equal(4); + expect(server.listMetricNamesByGroup().get('middleware').length).to.equal(5); expect(server.getAllCategoryMetrics().size).to.equal(5); }); diff --git a/packages/metrics/test/unit/common/metrics/FastCompass.test.ts b/packages/metrics/test/unit/common/metrics/FastCompass.test.ts new file mode 100644 index 00000000..5414c243 --- /dev/null +++ b/packages/metrics/test/unit/common/metrics/FastCompass.test.ts @@ -0,0 +1,56 @@ +import { expect } from 'chai'; +import { BaseFastCompass } from '../../../../src/common/metrics/FastCompass'; + +describe('/test/unit/metrics/common/metrics/FastCompass.test.ts', () => { + + it('testFastCompass', async () => { + + let fastCompass = new BaseFastCompass(10, 10, 10); + fastCompass.record(2, 'success'); + fastCompass.record(4, 'error'); + fastCompass.record(3, 'success'); + + // verify count + expect(fastCompass.getMethodCountPerCategory().has('success')).to.ok; + expect(Array.from(fastCompass.getMethodCountPerCategory(0).get('success').values())[ 0 ]).to.equal(2); + expect(fastCompass.getMethodCountPerCategory().has('error')).to.ok; + expect(Array.from(fastCompass.getMethodCountPerCategory(0).get('error').values())[ 0 ]).to.equal(1); + + + // verify rt + expect(fastCompass.getMethodRtPerCategory().has('success')).to.ok; + expect(Array.from(fastCompass.getMethodRtPerCategory(0).get('success').values())[ 0 ]).to.equal(5); + expect(fastCompass.getMethodRtPerCategory().has('error')).to.ok; + expect(Array.from(fastCompass.getMethodRtPerCategory(0).get('error').values())[ 0 ]).to.equal(4); + + + // total count + let totalCount = Array.from(fastCompass.getMethodCountPerCategory(0).get('success').values())[ 0 ] + + Array.from(fastCompass.getMethodCountPerCategory(0).get('error').values())[ 0 ]; + expect(totalCount).to.equal(3); + // average rt + let avgRt = (Array.from(fastCompass.getMethodRtPerCategory(0).get('success').values())[ 0 ] + + Array.from(fastCompass.getMethodRtPerCategory(0).get('error').values())[ 0 ]) / totalCount; + expect(avgRt).to.equal(3); + // verify count and rt + expect(fastCompass.getCountAndRtPerCategory().has('success')).to.ok; + expect(Array.from(fastCompass.getCountAndRtPerCategory(0).get('success').values())[ 0 ]).to.equal((2 << 38) + 5); + expect(fastCompass.getCountAndRtPerCategory().has('error')).to.ok; + expect(Array.from(fastCompass.getCountAndRtPerCategory(0).get('error').values())[ 0 ]).to.equal((1 << 38) + 4); + }); + + it('testBinaryAdd', () => { + let a1 = (1 << 38) + 10; + let a2 = (1 << 38) + 20; + expect((a1 + a2) >> 38).to.equal(2); + }); + + it('testMaxSubCategoryCount', () => { + let fastCompass = new BaseFastCompass(60, 10, 2); + fastCompass.record(10, 'success'); + fastCompass.record(20, 'error1'); + fastCompass.record(15, 'error2'); + + expect(Array.from(fastCompass.getMethodRtPerCategory().keys()).length).to.equal(2); + }); +});