diff --git a/README.md b/README.md index 872f771..454b84a 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ 数据收集,是监控系统一个最基本的功能,在Open-Falcon中,Agent采集到的数据,会先发送给Transfer组件。Transfer在接收到客户端发送的数据,做一些数据规整,检查之后,转发到多个后端系统去处理。在转发到每个后端业务系统的时候,Transfer会根据一致性哈希算法,进行数据分片,来达到后端业务系统的水平扩展。Transfer自身是无状态的,挂掉一台或者多台不会有任何影响。 -Transfer支持的业务后端,有三种,Judge、Graph、OpenTSDB(开源版本尚未开放此功能)。Judge是我们开发的高性能告警判定组件,Graph是我们开发的高性能数据存储、归档、查询组件,OpenTSDB是开源的时间序列数据存储服务。每个业务后端,都可以通过Transfer的配置文件来开启。 +Transfer支持的业务后端,有四种,Judge、Graph、OpenTSDB(开源版本尚未开放此功能)、DRRS(需要另外[安装](https://github.com/jdjr/drrs))。Judge是我们开发的高性能告警判定组件,Graph是我们开发的高性能数据存储、归档、查询组件,OpenTSDB是开源的时间序列数据存储服务,DRRS为京东金融集团杭州研发团队的同事开发的一个轻量级的分布式环形数据服务组件,用于监控数据的持久化和绘图。该组件作用于graph组件类似,并且能够在保证绘图效率的前提下实现秒级扩容。每个业务后端,都可以通过Transfer的配置文件来开启。 Transfer的数据来源,一般有四种: @@ -108,3 +108,15 @@ u want sending items via java jsonrpc client? turn to one java example: [jsonrpc - cluster: key-value形式的字典,表示后端的graph列表,其中key代表后端graph名字,value代表的是具体的ip:port(多个地址 用逗号隔开, transfer会将同一份数据 发送至各个地址) - clusterMigrating: key-value形式的字典,表示新扩容的后端的graph列表,其中key代表后端graph名字,value代表的是具体的ip:port(多个地址 用逗号隔开, transfer会将同一份数据 发送至各个地址) + drrs #启用此功能前请确保DRRS已被正确安装配置(https://github.com/jdjr/drrs),不能与graph同时使用 + - enable: true/false, 表示是否开启向DRRS发送数据 #不能和graph的enable同时为true + - useZk: 是否配置了zookeeper,若DRRS配置了多台master节点并且配置了zk,则配置为true + - dest: DRRS中master节点的地址,若没有配置zk,则这里需要配置master节点的地址,格式为ip:port + - replicas: 这是一致性hash算法需要的节点副本数量,建议不要变更,保持默认即可 + - maxIdle: 连接池相关配置,最大空闲连接数,建议保持默认 + - batch: 数据转发的批量大小,建议保持默认值 + - zk: zookeeper的相关配置信息,若useZk设置为true,则需要配置以下信息 + -ip: zk的ip地址,zk的端口需要保持默认的2181 + -addr: zk中DRRS配置信息的存放位置 + -timeout: zk的超时时间 + diff --git a/cfg.example.json b/cfg.example.json index bff706a..7cb07bf 100644 --- a/cfg.example.json +++ b/cfg.example.json @@ -39,5 +39,18 @@ }, "clusterMigrating": { } - } + }, + "drrs":{ + "enabled": false, + "useZk": false, + "dest" : "127.0.0.1:12300", + "replicas": 500, + "maxIdle" : 32, + "batch" : 200, + "zk": { + "ip" : "10.9.0.130", + "addr": "/drrs_master", + "timeout" : 10 + } + } } diff --git a/g/cfg.go b/g/cfg.go index 0acc87d..eb6a21b 100644 --- a/g/cfg.go +++ b/g/cfg.go @@ -53,6 +53,22 @@ type GraphConfig struct { ClusterMigrating2 map[string]*ClusterNode `json:"clusterMigrating2"` } +type ZkConfig struct { //drrs + Ip string `json:"ip"` + Addr string `json:"addr"` + Timeout int `json:"timeout"` +} + +type DrrsConfig struct { //drrs + Enabled bool `json:"enabled"` + UseZk bool `json:"useZk"` + Dest string `json:"dest"` + Replicas int `json:"replicas"` + MaxIdle int `json:"maxIdle"` + Batch int `json:"batch"` + Zk *ZkConfig `json:"zk"` +} + type GlobalConfig struct { Debug bool `json:"debug"` Http *HttpConfig `json:"http"` @@ -60,6 +76,7 @@ type GlobalConfig struct { Socket *SocketConfig `json:"socket"` Judge *JudgeConfig `json:"judge"` Graph *GraphConfig `json:"graph"` + Drrs *DrrsConfig `json:"drrs"` //drrs } var ( diff --git a/receiver/rpc/rpc_transfer.go b/receiver/rpc/rpc_transfer.go index cc286b6..507d39c 100644 --- a/receiver/rpc/rpc_transfer.go +++ b/receiver/rpc/rpc_transfer.go @@ -142,6 +142,9 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse sender.Push2JudgeSendQueue(items) } + if cfg.Drrs.Enabled { //drrs + sender.Push2DrrsSendQueue(items) + } reply.Message = "ok" reply.Total = len(args) reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000 diff --git a/sender/conn_pools.go b/sender/conn_pools.go index 8a79ac4..4fc3fc3 100644 --- a/sender/conn_pools.go +++ b/sender/conn_pools.go @@ -1,11 +1,109 @@ package sender import ( + "fmt" "github.com/open-falcon/transfer/g" cpool "github.com/open-falcon/transfer/sender/conn_pool" + "github.com/samuel/go-zookeeper/zk" nset "github.com/toolkits/container/set" + "github.com/jdjr/drrs/golang/sdk" + "log" + "net" + "time" ) +//监听zk中的master节点发生变化 +func watchZNode(ch <-chan zk.Event) { + cfg := g.Config() + drrsConfig := cfg.Drrs + for { + e := <-ch + if e.Type == zk.EventNodeChildrenChanged { + var master_list []string + c, _, err := zk.Connect([]string{drrsConfig.Zk.Ip}, time.Second*time.Duration(drrsConfig.Zk.Timeout)) + if err != nil { + drrs_master_list = nil + DrrsNodeRing = nil + log.Fatalln("[DRRS FATALL] watchZNode: ZK connection error: ", err) + } + children, stat, zkChannel, err := c.ChildrenW(drrsConfig.Zk.Addr) + if err != nil { + drrs_master_list = nil + DrrsNodeRing = nil + log.Fatalln("[DRRS FATALL] watchZNode: ZK get children error: ", err) + } + nzk := stat.NumChildren + if nzk <= 0 { + drrs_master_list = nil + DrrsNodeRing = nil + log.Fatalln("[DRRS FATALL] watchZNode: ZK contents error: ", zk.ErrNoChildrenForEphemerals) + } + for i := range children { + absAddr := fmt.Sprintf("%s/%s", drrsConfig.Zk.Addr, children[i]) + data_get, _, err := c.Get(absAddr) + if err != nil { + drrs_master_list = nil + DrrsNodeRing = nil + log.Fatalln("[DRRS FATALL] watchZNode: ZK get data error: ", err) + } + data := string(data_get) + if data == "" { + drrs_master_list = nil + DrrsNodeRing = nil + log.Fatalln("[DRRS FATALL] watchZNode: ZK data error: ", zk.ErrInvalidPath) + } + master_list = append(master_list, data) + } + drrs_master_list = master_list + DrrsNodeRing = newConsistentHashNodesRing(cfg.Drrs.Replicas, drrs_master_list) + go watchZNode(zkChannel) + break + } + } +} + +func initDrrsMasterList(drrsConfig *g.DrrsConfig) error { //drrs + if !drrsConfig.Enabled { + drrs_master_list = nil + return nil + } + if !drrsConfig.UseZk { + drrs_master_list = append(drrs_master_list, drrsConfig.Dest) + return nil + } + + c, _, err := zk.Connect([]string{drrsConfig.Zk.Ip}, time.Second*time.Duration(drrsConfig.Zk.Timeout)) + if err != nil { + drrs_master_list = nil + return err + } + children, stat, zkChannel, err := c.ChildrenW(drrsConfig.Zk.Addr) + if err != nil { + drrs_master_list = nil + return err + } + go watchZNode(zkChannel) + + nzk := stat.NumChildren + if nzk <= 0 { + drrs_master_list = nil + return zk.ErrNoChildrenForEphemerals + } + for i := range children { + absAddr := fmt.Sprintf("%s/%s", drrsConfig.Zk.Addr, children[i]) + data_get, _, err := c.Get(absAddr) + if err != nil { + return err + } + data := string(data_get) + if data == "" { + return zk.ErrInvalidPath + } + drrs_master_list = append(drrs_master_list, data) + } + return nil +} + func initConnPools() { cfg := g.Config() @@ -37,10 +135,40 @@ func initConnPools() { GraphMigratingConnPools = cpool.CreateSafeRpcConnPools(cfg.Graph.MaxConns, cfg.Graph.MaxIdle, cfg.Graph.ConnTimeout, cfg.Graph.CallTimeout, graphMigratingInstances.ToSlice()) } + + err := initDrrsMasterList(cfg.Drrs) //drrs + if err != nil { //drrs + log.Fatalln("init drrs zookeeper list acorrding to config file:", cfg, "fail:", err) + } + + if cfg.Drrs.Enabled { //drrs + if drrs_master_list != nil { + var addrs []*net.TCPAddr + for _, addr := range drrs_master_list { + //初始化drrs + tcpAddr, err := net.ResolveTCPAddr("tcp4", addr) + if err != nil { + log.Fatalln("config file:", cfg, "is not correct, cannot resolve drrs master tcp address. err:", err) + } + addrs = append(addrs, tcpAddr) + } + err := sdk.DRRSInit(addrs) + if err != nil { + log.Fatalln("[DRRS FATALL] StartSendTasks: DRRS init error: ", err) + return + } + } + } } func DestroyConnPools() { + cfg := g.Config() JudgeConnPools.Destroy() GraphConnPools.Destroy() GraphMigratingConnPools.Destroy() + if cfg.Drrs.Enabled { //drrs + if drrs_master_list != nil { + sdk.DRRSClose() + } + } } diff --git a/sender/drrs_falcon_client_for_transfer.go b/sender/drrs_falcon_client_for_transfer.go new file mode 100644 index 0000000..3406ff1 --- /dev/null +++ b/sender/drrs_falcon_client_for_transfer.go @@ -0,0 +1,235 @@ +package sender + +import ( + "fmt" + "github.com/jdjr/drrs/golang/drrsproto" + "github.com/jdjr/drrs/golang/sdk" + cmodel "github.com/open-falcon/common/model" + "math" + "strconv" + "time" +) + +// RRA.Point.Size +const ( + RRA1PointCnt = 720 // 1m一个点存12h + RRA5PointCnt = 576 // 5m一个点存2d + RRA20PointCnt = 504 // 20m一个点存7d + RRA180PointCnt = 766 // 3h一个点存3month + RRA720PointCnt = 730 // 12h一个点存1year + //begin + C_TIMEOUT = 5 // create timeout = 5 seconds + U_TIMEOUT = 5 // update timeout = 5 seconds + F_TIMEOUT = 10 // fetch timeout = 10 seconds +//end +) + +func generate_create_package_with_rrdfile(rrdFile *string, item *cmodel.GraphItem) (*drrsproto.DrrsCreate, error) { + var heads []*drrsproto.DrrsCreateHead + var rras []*drrsproto.DrrsCreateRra + var head *drrsproto.DrrsCreateHead + var rra *drrsproto.DrrsCreateRra + var err error + + dataSource := "metric" + dstSourceType := item.DsType + heartbeats := item.Heartbeat + minValue, _ := strconv.Atoi(item.Min) + maxValue, _ := strconv.Atoi(item.Max) + p_min := &minValue + p_max := &maxValue + if minValue == 0 { + p_min = nil + } + if maxValue == 0 { + p_max = nil + } + head, err = sdk.Make_drrs_create_head(&dataSource, &dstSourceType, &heartbeats, p_min, p_max) + if err != nil { + return nil, err + } + heads = append(heads, head) + + // 1分钟一个点存 12小时 + cf := "AVERAGE" + xff := 0.5 + pdp_cns := 1 + cdp_cns := RRA1PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + // 5m一个点存2d + rras = append(rras, rra) + cf = "AVERAGE" + xff = 0.5 + pdp_cns = 5 + cdp_cns = RRA5PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MAX" + xff = 0.5 + pdp_cns = 5 + cdp_cns = RRA5PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MIN" + xff = 0.5 + pdp_cns = 5 + cdp_cns = RRA5PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + // 20m一个点存7d + rras = append(rras, rra) + cf = "AVERAGE" + xff = 0.5 + pdp_cns = 20 + cdp_cns = RRA20PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MAX" + xff = 0.5 + pdp_cns = 20 + cdp_cns = RRA20PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MIN" + xff = 0.5 + pdp_cns = 20 + cdp_cns = RRA20PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + // 3小时一个点存3个月 + rras = append(rras, rra) + cf = "AVERAGE" + xff = 0.5 + pdp_cns = 180 + cdp_cns = RRA180PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MAX" + xff = 0.5 + pdp_cns = 180 + cdp_cns = RRA180PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MIN" + xff = 0.5 + pdp_cns = 180 + cdp_cns = RRA180PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + // 12小时一个点存1year + rras = append(rras, rra) + cf = "AVERAGE" + xff = 0.5 + pdp_cns = 720 + cdp_cns = RRA720PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MAX" + xff = 0.5 + pdp_cns = 720 + cdp_cns = RRA720PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + cf = "MIN" + xff = 0.5 + pdp_cns = 720 + cdp_cns = RRA720PointCnt + rra, err = sdk.Make_drrs_create_rra(&cf, &xff, &pdp_cns, &cdp_cns) + if err != nil { + return nil, err + } + rras = append(rras, rra) + + now := time.Now() + startTime := now.Add(time.Duration(-24) * time.Hour) + startTime_U := startTime.Unix() + step := item.Step + createPkg, err := sdk.Make_drrs_create(rrdFile, &startTime_U, &step, heads, rras) + if err != nil { + return nil, err + } + return createPkg, nil +} + +func create(filename string, item *cmodel.GraphItem, addr string) error { + timeout := time.Second * time.Duration(C_TIMEOUT) + pkg, err := generate_create_package_with_rrdfile(&filename, item) + if err != nil { + return err + } + _, err = sdk.DRRSCreate(pkg, timeout, addr) + + return err +} + +func update(filename string, item *cmodel.GraphItem, addr string) error { + var updateVals []*drrsproto.DrrsUVal + + v := math.Abs(item.Value) + if v > 1e+300 || (v < 1e-300 && v > 0) { + return fmt.Errorf("Value of item is either too large or too small") + } + var values []string + var s_val string + if item.DsType == "DERIVE" || item.DsType == "COUNTER" { + s_val = fmt.Sprintf("%d", int(item.Value)) + } else { + s_val = fmt.Sprintf("%f", item.Value) + } + values = append(values, s_val) + + timestamp := item.Timestamp + updateVal, err := sdk.Make_drrs_update_value(×tamp, values) + if err != nil { + return err + } + updateVals = append(updateVals, updateVal) + + updatePkg, err := sdk.Make_drrs_update(&filename, nil, updateVals) + if err != nil { + return err + } + timeout := time.Second * time.Duration(U_TIMEOUT) + err_u := sdk.DRRSUpdate(updatePkg, timeout, addr) + return err_u +} + +func RrdFileName(md5 string) string { + return fmt.Sprintf("%s.rrd", md5) +} diff --git a/sender/node_rings.go b/sender/node_rings.go index 98a3f6e..726b2cf 100644 --- a/sender/node_rings.go +++ b/sender/node_rings.go @@ -10,6 +10,15 @@ func initNodeRings() { JudgeNodeRing = newConsistentHashNodesRing(cfg.Judge.Replicas, KeysOfMap(cfg.Judge.Cluster)) GraphNodeRing = newConsistentHashNodesRing(cfg.Graph.Replicas, KeysOfMap(cfg.Graph.Cluster)) + if cfg.Drrs.Enabled { + if drrs_master_list != nil { + DrrsNodeRing = newConsistentHashNodesRing(cfg.Drrs.Replicas, drrs_master_list) + } else { + DrrsNodeRing = nil + } + } else { + DrrsNodeRing = nil + } if cfg.Graph.Migrating && cfg.Graph.ClusterMigrating != nil { GraphMigratingNodeRing = newConsistentHashNodesRing(cfg.Graph.Replicas, KeysOfMap(cfg.Graph.ClusterMigrating)) } diff --git a/sender/send_queues.go b/sender/send_queues.go index 6380ee7..23cc936 100644 --- a/sender/send_queues.go +++ b/sender/send_queues.go @@ -12,6 +12,11 @@ func initSendQueues() { JudgeQueues[node] = Q } + if drrs_master_list != nil && len(drrs_master_list) > 0 { + Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize) + DrrsQueues["drrs_master"] = Q + } + for node, nitem := range cfg.Graph.Cluster2 { for _, addr := range nitem.Addrs { Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize) diff --git a/sender/send_tasks.go b/sender/send_tasks.go index d44ecc8..d42f8c4 100644 --- a/sender/send_tasks.go +++ b/sender/send_tasks.go @@ -21,12 +21,16 @@ func startSendTasks() { // init semaphore judgeConcurrent := cfg.Judge.MaxIdle graphConcurrent := cfg.Graph.MaxIdle + drrsConcurrent := cfg.Drrs.MaxIdle //drrs if judgeConcurrent < 1 { judgeConcurrent = 1 } if graphConcurrent < 1 { graphConcurrent = 1 } + if drrsConcurrent < 1 { //drrs + graphConcurrent = 1 + } // init send go-routines for node, _ := range cfg.Judge.Cluster { @@ -49,6 +53,97 @@ func startSendTasks() { } } } + + if cfg.Drrs.Enabled { //drrs + if drrs_master_list != nil { + //这里只有一个发送队列,保证zk中节点变化时数据不丢失 + queue := DrrsQueues["drrs_master"] + go forward2DrrsTask(queue, drrsConcurrent) + } + } +} + +func forward2DrrsTask(Q *list.SafeListLimited, concurrent int) { + batch := g.Config().Drrs.Batch // 一次发送,最多batch条数据 + sema := nsema.NewSemaphore(concurrent) + + for { + items := Q.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(DefaultSendTaskSleepInterval) + continue + } + + //这里依然使用graphitem + graphItems := make([]*cmodel.GraphItem, count) + for i := 0; i < count; i++ { + graphItems[i] = items[i].(*cmodel.GraphItem) + } + + // 同步Call + 有限并发 进行发送 + //TODO 这里还是单条发送,等drrs优化后将来改成批量发送。 + sema.Acquire() + //TODO 这里先create再update,两次网络通信效率不高,待drrs优化update逻辑后进行修改 + go func(graphItems []*cmodel.GraphItem) { + defer sema.Release() + + for _, item := range graphItems { + + checksum := item.Checksum() + addr, err := DrrsNodeRing.GetNode(checksum) + if err != nil { + log.Println("[DRRS ERROR] DRRS GET NODE RING ERROR:", err) + continue + } + filename := RrdFileName(checksum) + err = create(filename, item, addr) + if err != nil { + //create出错,重试两次,看是不是master挂了,尝试ck分配新的master。 + ok := false + for i := 0; i < 2; i++ { + time.Sleep(time.Second * 10) + addr, err = DrrsNodeRing.GetNode(checksum) + if err != nil { + log.Println("[DRRS ERROR] DRRS GET NODE RING ERROR:", err) + break + } + err = create(filename, item, addr) + if err == nil { + ok = true + break + } + } + if !ok { + log.Println("[DRRS ERROR] rrd create error: ", err) + continue + } + } + err = update(filename, item, addr) + if err != nil { + //update出错,重试两次,看是不是master挂了,尝试ck分配新的master。 + ok := false + for i := 0; i < 2; i++ { + time.Sleep(time.Second * 10) + addr, err = DrrsNodeRing.GetNode(checksum) + if err != nil { + log.Println("[DRRS ERROR] DRRS GET NODE RING ERROR:", err) + break + } + err = update(filename, item, addr) + if err == nil { + ok = true + break + } + } + if !ok { + log.Println("[DRRS ERROR] rrd create error: ", err) + continue + } + } + } + }(graphItems) + } } // Judge定时任务, 将 Judge发送缓存中的数据 通过rpc连接池 发送到Judge diff --git a/sender/sender.go b/sender/sender.go index 9107e28..32c80fb 100644 --- a/sender/sender.go +++ b/sender/sender.go @@ -19,6 +19,7 @@ const ( var ( JudgeNodeRing *ConsistentHashNodeRing GraphNodeRing *ConsistentHashNodeRing + DrrsNodeRing *ConsistentHashNodeRing //drrs GraphMigratingNodeRing *ConsistentHashNodeRing ) @@ -27,6 +28,7 @@ var ( var ( JudgeQueues = make(map[string]*nlist.SafeListLimited) GraphQueues = make(map[string]*nlist.SafeListLimited) + DrrsQueues = make(map[string]*nlist.SafeListLimited) //drrs GraphMigratingQueues = make(map[string]*nlist.SafeListLimited) ) @@ -38,6 +40,8 @@ var ( GraphMigratingConnPools *cpool.SafeRpcConnPools ) +var drrs_master_list []string //drrs + // 初始化数据发送服务, 在main函数中调用 func Start() { initConnPools() @@ -147,6 +151,34 @@ func Push2GraphSendQueue(items []*cmodel.MetaData, migrating bool) { } } +//这里将所有的数据都打入到同一个drrs发送队列。因为有ck,ck会动态的增加删除节点,因此在create或update的时候再决定是哪个master节点进行转发。 +func Push2DrrsSendQueue(items []*cmodel.MetaData) { //drrs + + for _, item := range items { + drrsItem, err := convert2GraphItem(item) + if err != nil { + log.Println("E:", err) + continue + } + //pk := item.PK() + + // statistics. 为了效率,放到了这里,因此只有graph是enbale时才能trace + //proc.RecvDataTrace.Trace(pk, item) + //proc.RecvDataFilter.Filter(pk, item.Value, item) + + //drrs_master, err := DrrsNodeRing.GetNode(pk) + //if err != nil { + // log.Println("E:", err) + // continue + //} + errCnt := 0 + Q := DrrsQueues["drrs_master"] + if !Q.PushFront(drrsItem) { + errCnt += 1 + } + } +} + // 打到Graph的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp func convert2GraphItem(d *cmodel.MetaData) (*cmodel.GraphItem, error) { item := &cmodel.GraphItem{}