Apache Nifi - Kafka to HBase
·
DataPipeline/Nifi
[ 아키텍처 ]Kafka -> Nifi -> HBase -> Python으로 데이터 Fetch   [ Kafka ]카프카의 토픽명과 컨슈머그룹명을 정해 데이터를 받는다.  [ Nifi ]아래와 같은 프로세스 플로우로 구성할 수 있다. ConsumerKafka에 토픽명과 그룹명을 설정값으로 입력한다UpdateAttribue와 ReplaceText는 해당 데이터가 나이파이에 적제되는 시간을 입력하기 위함이므로 생략가능하다.PutHBaseJson에 HBase TableName, Row Identifier, Column Family값을 입력한다.네임스페이스가 있다면 NS:TableName과 값이 입력한다.Row Identifier값은 로우키가 되므로 유니크한 값으로 설정한다. 예시에선 $(createdAt}_$..
Apache Nifi - Kafka to HDFS
·
DataPipeline/Nifi
Kafka에서 데이터를 받아 HDFS에 json, csv형태로 입력받는다.저장 후 Hive, Spark 등으로 데이터를 처리할 수 있으며실시간 데이터 인풋을 Nifi를 통해 확인할 수 있는 장점이 있다.  1. [ ConsumeKafka ] 컨슈머로 연결하여 브로커에서 보내는 메세지를 소비Properties : Kafka Brokers : 172.31.11.11:9092, 172.31.22.22:9092, 172.31.33.33:9092Topic Name(s) : your-topic-nameGroup ID : your-kafka-group-id  2. [ UpdateAttribute ]들어오는 카프카데이터에 createdAt를 추가하기 위한 프로퍼티filename은 HDFS저장시 파일명을 시간으로 지정하..
Apache Nifi - 카프카 데이터 분산처리 ( Distribute Kafka Data )
·
DataPipeline/Nifi
1. Nifi Processor MergeContent MergeContent프로세서를 이용해 Input processor(ex.Kafka, File)에서 들어오는 데이터를적절한 크기로 Merge한 다음에 다음 단계로 넘겨주는 Flow를 기대했다.그러나 MergeContent는 이미 받은 데이터를 복제한 후 병합하는 형식으로,기대했던 결과를 가져오지는 못했다. 예를 들어 100의 데이터가 들어온다면 100개의 데이터를 차곡차곡 쌓았다가하나의 데이터로 병합하여 결국 1개의 데이터가 들어와야 하는것을 결과로 예상했지만,MergeContent는 100개의 데이터를 일단 Nifi 저장 후,그 데이터들을 병합한 데이터셋 복제복을 만들어 다음 프로세스로 넘겨준다.결국 병합된 1개의 데이터셋을 전달하지만, 이중 저장..
Apache Nifi - Queued 설정
·
DataPipeline/Nifi
Queued 우측클릭->구성설정 버튼을 클릭하면 아래와 같이 설정할 수 있는 창이나온다.  FlowFile ExpirationFlowFile Expiration은 특정기간에 쌓인 데이터들 중 처리할 수 없는 데이터를 자동으로 제거할 수 있는 개념이다.예를 들어 큐의 데이터 볼륨이 초과할 것으로 예상됬을 때,유통기한을 우선 순위자와 함께 사용하여 우선 순위가 가장 높은 데이터를 먼저 처리한 후일정 기간(예: 1시간) 내에 처리할 수 없는 것은 모두 삭제할 수 있다.  만료 기간은 데이터가 NiFi 인스턴스에 들어간 시간을 기준으로 한다.즉, 특정 연결의 FlowFile Expiration가 '1시간'으로 설정되어 있고NiFi 인스턴스에 1시간 동안 있었던 파일이 해당 연결에 도달하면 만료된다.  기본값 0..
Apache Nifi - 설치 [ Install in cloudera manager ]
·
DataPipeline/Nifi
[ CFM배포를 설치 ] $ cd /opt/cloudera/csd $ wget http://archive.cloudera.com/CFM/csd/1.0.0.0/NIFI-1.9.0.1.0.0.0-90.jar $ wget http://archive.cloudera.com/CFM/csd/1.0.0.0/NIFICA-1.9.0.1.0.0.0-90.jar $ wget http://archive.cloudera.com/CFM/csd/1.0.0.0/NIFIREGISTRY-0.3.0.1.0.0.0-90.jar $ chown cloudera-scm:cloudera-scm NIFI*.jar $ chmod 644 NIFI*.jar $ service cloudera-scm-server restart [ 클라우데라 매니저 서비스 ..
wave35
'DataPipeline/Nifi' 카테고리의 글 목록