DataPipeline/Nifi

Apache Nifi - Kafka to HDFS

wave35 2023. 3. 22. 21:33

 

Kafka에서 데이터를 받아 HDFS에 json, csv형태로 입력받는다.

저장 후 Hive, Spark 등으로 데이터를 처리할 수 있으며

실시간 데이터 인풋을 Nifi를 통해 확인할 수 있는 장점이 있다.

 

 

1. [ ConsumeKafka ] 

컨슈머로 연결하여 브로커에서 보내는 메세지를 소비

Properties : 

Kafka Brokers : 172.31.11.11:9092, 172.31.22.22:9092, 172.31.33.33:9092

Topic Name(s) : your-topic-name

Group ID : your-kafka-group-id

 

 

2. [ UpdateAttribute ]

들어오는 카프카데이터에 createdAt를 추가하기 위한 프로퍼티

filename은 HDFS저장시 파일명을 시간으로 지정하기 위함

Property : 

createdAt : ${now():format("yyyy-MM-dd:HH:mm:ss")}

filename : st_${now():format("yyyyMMddHHmmssSS")}_${random():mod(99):plus(1000)}

 

 

3. [ ReplaceText ]

2.에서 지정한 createdAt값을 Json으로 입력받는 Kafka데이터 맨뒤에 추가한다.

 

Property :

Search Value : (?s:(^.*)}$)

Replacement Value : $1,"createdAt":"${createdAt}"}

Replacement Strategy : Regex Replace

 

 

4. [ PutHDFS ]

해당되는 파일의 Path를 입력한다.

Property :

Hadoop Configuration Resources : /etc/hadoop/conf.cloudera.hdfs/core-site.xml,

/etc/hadoop/conf.cloudera.hdfs/hdfs-site.xml

Directory : /user/nifi/yourpath

 


PutEmail을 추가하여 Nifi에러시 Email을 받게 설정할 수도 있다.