Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加了DRRS的支持 #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

数据收集,是监控系统一个最基本的功能,在Open-Falcon中,Agent采集到的数据,会先发送给Transfer组件。Transfer在接收到客户端发送的数据,做一些数据规整,检查之后,转发到多个后端系统去处理。在转发到每个后端业务系统的时候,Transfer会根据一致性哈希算法,进行数据分片,来达到后端业务系统的水平扩展。Transfer自身是无状态的,挂掉一台或者多台不会有任何影响。

Transfer支持的业务后端,有三种,Judge、Graph、OpenTSDB(开源版本尚未开放此功能)。Judge是我们开发的高性能告警判定组件,Graph是我们开发的高性能数据存储、归档、查询组件,OpenTSDB是开源的时间序列数据存储服务。每个业务后端,都可以通过Transfer的配置文件来开启。
Transfer支持的业务后端,有四种,Judge、Graph、OpenTSDB(开源版本尚未开放此功能)、DRRS(需要另外[安装](https://github.com/jdjr/drrs))。Judge是我们开发的高性能告警判定组件,Graph是我们开发的高性能数据存储、归档、查询组件,OpenTSDB是开源的时间序列数据存储服务,DRRS为京东金融集团杭州研发团队的同事开发的一个轻量级的分布式环形数据服务组件,用于监控数据的持久化和绘图。该组件作用于graph组件类似,并且能够在保证绘图效率的前提下实现秒级扩容。每个业务后端,都可以通过Transfer的配置文件来开启。

Transfer的数据来源,一般有四种:

Expand Down Expand Up @@ -108,3 +108,15 @@ u want sending items via java jsonrpc client? turn to one java example: [jsonrpc
- cluster: key-value形式的字典,表示后端的graph列表,其中key代表后端graph名字,value代表的是具体的ip:port(多个地址 用逗号隔开, transfer会将同一份数据 发送至各个地址)
- clusterMigrating: key-value形式的字典,表示新扩容的后端的graph列表,其中key代表后端graph名字,value代表的是具体的ip:port(多个地址 用逗号隔开, transfer会将同一份数据 发送至各个地址)

drrs #启用此功能前请确保DRRS已被正确安装配置(https://github.com/jdjr/drrs),不能与graph同时使用
- enable: true/false, 表示是否开启向DRRS发送数据 #不能和graph的enable同时为true
- useZk: 是否配置了zookeeper,若DRRS配置了多台master节点并且配置了zk,则配置为true
- dest: DRRS中master节点的地址,若没有配置zk,则这里需要配置master节点的地址,格式为ip:port
- replicas: 这是一致性hash算法需要的节点副本数量,建议不要变更,保持默认即可
- maxIdle: 连接池相关配置,最大空闲连接数,建议保持默认
- batch: 数据转发的批量大小,建议保持默认值
- zk: zookeeper的相关配置信息,若useZk设置为true,则需要配置以下信息
-ip: zk的ip地址,zk的端口需要保持默认的2181
-addr: zk中DRRS配置信息的存放位置
-timeout: zk的超时时间

15 changes: 14 additions & 1 deletion cfg.example.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,18 @@
},
"clusterMigrating": {
}
}
},
"drrs":{
"enabled": false,
"useZk": false,
"dest" : "127.0.0.1:12300",
"replicas": 500,
"maxIdle" : 32,
"batch" : 200,
"zk": {
"ip" : "10.9.0.130",
"addr": "/drrs_master",
"timeout" : 10
}
}
}
17 changes: 17 additions & 0 deletions g/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,30 @@ type GraphConfig struct {
ClusterMigrating2 map[string]*ClusterNode `json:"clusterMigrating2"`
}

type ZkConfig struct { //drrs
Ip string `json:"ip"`
Addr string `json:"addr"`
Timeout int `json:"timeout"`
}

type DrrsConfig struct { //drrs
Enabled bool `json:"enabled"`
UseZk bool `json:"useZk"`
Dest string `json:"dest"`
Replicas int `json:"replicas"`
MaxIdle int `json:"maxIdle"`
Batch int `json:"batch"`
Zk *ZkConfig `json:"zk"`
}

type GlobalConfig struct {
Debug bool `json:"debug"`
Http *HttpConfig `json:"http"`
Rpc *RpcConfig `json:"rpc"`
Socket *SocketConfig `json:"socket"`
Judge *JudgeConfig `json:"judge"`
Graph *GraphConfig `json:"graph"`
Drrs *DrrsConfig `json:"drrs"` //drrs
}

var (
Expand Down
3 changes: 3 additions & 0 deletions receiver/rpc/rpc_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse
sender.Push2JudgeSendQueue(items)
}

if cfg.Drrs.Enabled { //drrs
sender.Push2DrrsSendQueue(items)
}
reply.Message = "ok"
reply.Total = len(args)
reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000
Expand Down
128 changes: 128 additions & 0 deletions sender/conn_pools.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,109 @@
package sender

import (
"fmt"
"github.com/open-falcon/transfer/g"
cpool "github.com/open-falcon/transfer/sender/conn_pool"
"github.com/samuel/go-zookeeper/zk"
nset "github.com/toolkits/container/set"
"github.com/jdjr/drrs/golang/sdk"
"log"
"net"
"time"
)

//监听zk中的master节点发生变化
func watchZNode(ch <-chan zk.Event) {
cfg := g.Config()
drrsConfig := cfg.Drrs
for {
e := <-ch
if e.Type == zk.EventNodeChildrenChanged {
var master_list []string
c, _, err := zk.Connect([]string{drrsConfig.Zk.Ip}, time.Second*time.Duration(drrsConfig.Zk.Timeout))
if err != nil {
drrs_master_list = nil
DrrsNodeRing = nil
log.Fatalln("[DRRS FATALL] watchZNode: ZK connection error: ", err)
}
children, stat, zkChannel, err := c.ChildrenW(drrsConfig.Zk.Addr)
if err != nil {
drrs_master_list = nil
DrrsNodeRing = nil
log.Fatalln("[DRRS FATALL] watchZNode: ZK get children error: ", err)
}
nzk := stat.NumChildren
if nzk <= 0 {
drrs_master_list = nil
DrrsNodeRing = nil
log.Fatalln("[DRRS FATALL] watchZNode: ZK contents error: ", zk.ErrNoChildrenForEphemerals)
}
for i := range children {
absAddr := fmt.Sprintf("%s/%s", drrsConfig.Zk.Addr, children[i])
data_get, _, err := c.Get(absAddr)
if err != nil {
drrs_master_list = nil
DrrsNodeRing = nil
log.Fatalln("[DRRS FATALL] watchZNode: ZK get data error: ", err)
}
data := string(data_get)
if data == "" {
drrs_master_list = nil
DrrsNodeRing = nil
log.Fatalln("[DRRS FATALL] watchZNode: ZK data error: ", zk.ErrInvalidPath)
}
master_list = append(master_list, data)
}
drrs_master_list = master_list
DrrsNodeRing = newConsistentHashNodesRing(cfg.Drrs.Replicas, drrs_master_list)
go watchZNode(zkChannel)
break
}
}
}

func initDrrsMasterList(drrsConfig *g.DrrsConfig) error { //drrs
if !drrsConfig.Enabled {
drrs_master_list = nil
return nil
}
if !drrsConfig.UseZk {
drrs_master_list = append(drrs_master_list, drrsConfig.Dest)
return nil
}

c, _, err := zk.Connect([]string{drrsConfig.Zk.Ip}, time.Second*time.Duration(drrsConfig.Zk.Timeout))
if err != nil {
drrs_master_list = nil
return err
}
children, stat, zkChannel, err := c.ChildrenW(drrsConfig.Zk.Addr)
if err != nil {
drrs_master_list = nil
return err
}
go watchZNode(zkChannel)

nzk := stat.NumChildren
if nzk <= 0 {
drrs_master_list = nil
return zk.ErrNoChildrenForEphemerals
}
for i := range children {
absAddr := fmt.Sprintf("%s/%s", drrsConfig.Zk.Addr, children[i])
data_get, _, err := c.Get(absAddr)
if err != nil {
return err
}
data := string(data_get)
if data == "" {
return zk.ErrInvalidPath
}
drrs_master_list = append(drrs_master_list, data)
}
return nil
}

func initConnPools() {
cfg := g.Config()

Expand Down Expand Up @@ -37,10 +135,40 @@ func initConnPools() {
GraphMigratingConnPools = cpool.CreateSafeRpcConnPools(cfg.Graph.MaxConns, cfg.Graph.MaxIdle,
cfg.Graph.ConnTimeout, cfg.Graph.CallTimeout, graphMigratingInstances.ToSlice())
}

err := initDrrsMasterList(cfg.Drrs) //drrs
if err != nil { //drrs
log.Fatalln("init drrs zookeeper list acorrding to config file:", cfg, "fail:", err)
}

if cfg.Drrs.Enabled { //drrs
if drrs_master_list != nil {
var addrs []*net.TCPAddr
for _, addr := range drrs_master_list {
//初始化drrs
tcpAddr, err := net.ResolveTCPAddr("tcp4", addr)
if err != nil {
log.Fatalln("config file:", cfg, "is not correct, cannot resolve drrs master tcp address. err:", err)
}
addrs = append(addrs, tcpAddr)
}
err := sdk.DRRSInit(addrs)
if err != nil {
log.Fatalln("[DRRS FATALL] StartSendTasks: DRRS init error: ", err)
return
}
}
}
}

func DestroyConnPools() {
cfg := g.Config()
JudgeConnPools.Destroy()
GraphConnPools.Destroy()
GraphMigratingConnPools.Destroy()
if cfg.Drrs.Enabled { //drrs
if drrs_master_list != nil {
sdk.DRRSClose()
}
}
}
Loading