Skip to content
This repository has been archived by the owner on May 15, 2019. It is now read-only.

Open-Network-Insight/oni-ingest

Repository files navigation

Open-Network-Insight Ingest Framework

Ingest data is captured or transferred into the Hadoop cluster, where they are transformed and loaded into solution data stores.

Ingest Framework

Getting Started

Prerequisites

Configure Kafka

Adding Kafka Service:

Ingest framework needs Kafka to work in real-time streaming. Add Kafka service using Cloudera Manager. If you are using a Cloudera Manager version < 5.4.1 you will need to add the kafka parcel manually.

Ingest module uses a default configuration for the message size (999999 bytes), if you modify this size in the ingest configuration file you will need to modify the following configuration properties in kafka:

  • message.max.bytes
  • replica.fetch.max.bytes

Spark-Streaming Kafka support.

Download the following jar file: spark-streaming-kafka-0-8-assembly_2.11. This jar adds support for Spark Streaming + Kafka and needs to be downloaded on the following path : oni-ingest/oni (with the same name)

Getting Started

Required Roles

The following roles are required in all the nodes where the Ingest Framework will be running.

Get the code:

 git clone https://github.com/Open-Network-Insight/oni-ingest.git

Ingest Configuration:

The file ingest_conf.json contains all the required configuration to start the ingest module

  • dbname: Name of HIVE database where all the ingested data will be stored in avro-parquet format.
  • hdfs_app_path: Application path in HDFS where the pipelines will be stored (i.e /user/application_user/).
  • kafka: Kafka and Zookeeper server information required to create/listen topics and partitions.
  • pipelines: In this section you can add multiple configurations for either the same pipeline or different pipelines. The configuration name must be lowercase without spaces (i.e. flow_internals).

Configuration example:

  "dbname" : "database name",
  "hdfs_app_path" : "hdfs application path",
  "kafka":{
        "kafka_server":"kafka ip",
        "kafka_port":"kafka port",
        "zookeper_server":"zk ip",
        "zookeper_port":"zk port",
        "message_size":999999
   },
  "pipelines":{
  
     "flow_internals":{
          "type":"flow",
          "collector_path":"/path_to_flow_collector",
          "local_staging":"/tmp/",
          "process_opt":""
      },
      "flow_externals":{
          "type":"flow",
          "collector_path":"/path_to_flow_collector",
          "local_staging":"/tmp/",
          "process_opt":""
      },
      "dns_server_1":{
          "type":"dns",
          "collector_path":"/path_to_dns_collector",
          "local_staging":"/tmp/",
          "pkt_num":"650000",
          "pcap_split_staging":"/tmp",    
          "process_opt":"-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'"
      }

Starting the Ingest

Running in a Standalone Mode:

bash start_standalone_ingest.sh "pipeline_configuration" "num of workers"

Following the previous configuration example starting ingest module in a stand alone mode will look like:

bash start_standalone_ingest.sh flow_internals 4

Running in a Cluster Mode:

Running Master: Master needs to be run in the same server where the collector path is.

python master_collector.py -t "pipeline_configuration" -w "number of workers"

Running Workers: Worker needs to be executed in a server where the required processing program installed (i.e. nfdump), also the worker needs to be identified with a specific id, this id needs to start with 0.

example:

  1. worker_0, id = 0
  2. worker_1 , id = 1

This "id" is required to attach the worker with the kafka partition.

python worker.py -t "pipeline_configuration" -i "id of the worker (starts with 0)" --topic "my_topic"