Apache Sqoop - 구분자 설정 ( Hive )
·
DataPipeline/Sqoop
Sqoop의 Default 구분자는 ',' 이며, RDB 데이터를 입력받을 시 Column의 Text데이터가 "안녕하세요, 저는 어쩌고, ..." 와 같은 ','가 포함되었다면 구분자를 변경해야 한다. 아래 설정은 \t (탭)으로 구분자를 설정하며 Oozie XML -> Sqoop -> Hive Table에 넣는 과정을 담는다. [ Oozie XML ] 아래 설정을 추가 한다. --fields-terminated-by "\t" [ Hive Table Create ] 생성시 Delimited fields를 정의한다. ( 또는 업데이트 ) CREATE TABLE temp.table_temp ( id BIGINT, name VARCHAR(30), pay DECIMAL(10,2) tag VARCHAR(200) ..
Apache Hive - Encoding 과 Decoding
·
DataPipeline/Hive
'20528978에 대하여 인코딩 디코딩. [ 인코딩 ] select base64(encode('20528978', 'UTF-8')) 결과 값 : MjA1Mjg5Nzg= [ 디코딩 ] select decode(unbase64('MjA1Mjg5Nzg='), 'UTF-8') 결과 값 : 20528978 [ 다양한 형식 ] select decode(unbase64('MjA1Mjg5Nzg='), 'ISO-8859-1'); select decode(unbase64('MjA1Mjg5Nzg='), 'UTF-16LE'); 변환해주는 웹사이트 https://www.base64encode.org/d
Apache Nifi - Kafka to HDFS
·
DataPipeline/Nifi
Kafka에서 데이터를 받아 HDFS에 json, csv형태로 입력받는다.저장 후 Hive, Spark 등으로 데이터를 처리할 수 있으며실시간 데이터 인풋을 Nifi를 통해 확인할 수 있는 장점이 있다.  1. [ ConsumeKafka ] 컨슈머로 연결하여 브로커에서 보내는 메세지를 소비Properties : Kafka Brokers : 172.31.11.11:9092, 172.31.22.22:9092, 172.31.33.33:9092Topic Name(s) : your-topic-nameGroup ID : your-kafka-group-id  2. [ UpdateAttribute ]들어오는 카프카데이터에 createdAt를 추가하기 위한 프로퍼티filename은 HDFS저장시 파일명을 시간으로 지정하..
Apache Kafka - Simple Python Code
·
DataPipeline/Kafka
[ Ubuntu ] * 사전설치 Python3 sudo apt-get upgrade # Pip설치 sudo apt-get install python3-pip #Kafka Lib 설치 sudo pip3 install kafka-python [ 테스트 코드 ] from kafka import KafkaProducer import time producer = KafkaProducer( bootstrap_servers=['172.31.11.11:9092', '172.31.22.222:9092', '172.31.33.33:9092'] ) for i in range(1,50): producer.send('Topic-Name', str.encode('kafka:-%d' % i) ) time.sleep(0.5)
Zookeeper - CLI명령어와 ACL
·
DataPipeline/Zookeeper
[ CLI 명령어 ]zookeeper/bin/zkCli.sh에서 CLI 접속create /znode "first-app"ls /znodeget /znodeset /znode "date-updated"rmr /znode  [ ACL ]znode접근시 Authentication is not valid : /znode-ex 라는 메세지가 뜬다면인증권한을 확인할 필요가 있다. ACL(Action Control List)은 znode에 액세스 하기위한 인증제어이며 5가지의 종류로 구분된다.ACL 권한 종료CREATE - 생성 : ZNode의 자식 노드를 생성할 수 있는 권한READ - 읽기 : ZNode의 데이터를 읽고 자식 노드를 나열할 수 있는 권한WRITE - 쓰기 : ZNode의 데이터를 설정하거나 수정할..
Apache Kafka - 토픽삭제가 안되는 경우
·
DataPipeline/Kafka
[ 토픽삭제 안되는 경우 ] server.properties 파일의 delete.topic.enable=true임에도 토픽이 삭제되지 않는경우 1. 카프카 dir.log파일과 관련 주키퍼로그 파일 삭제 2. 카프카 브로커를 재시작한다. 참조 : https://stackoverflow.com/questions/23976670/when-how-does-a-topic-marked-for-deletion-get-finally-removed : https://stackoverflow.com/questions/44564606/how-can-i-remove-kafka-topics-marked-for-deletion
Apache Phoenix - CDH 피닉스 초기 설정 [Init Configuration]
·
DataPipeline/Phoenix
[ CDH Phoenix 초기설정 ] 1. HBase 서비스 탭 2. hbase-site.xml에 대한 HBase 서비스 고급 구성 스니펫(안전 밸브) 검색 [1] Secondary Index 피닉스의 Secondary Index를 사용하기 위해 설정값 추가 이름 : hbase.regionserver.wal.codec 값 : org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec 피닉스 인덱싱에 관한 참조 : https://phoenix.apache.org/secondary_indexing.html [2] 사용자정의 함수 사용 사용자 정의함수를 사용하도록 다음 속성을 설정 이름 : phoenix.functions.allowUserDefinedFunct..
Apache Nifi - 카프카 데이터 분산처리 ( Distribute Kafka Data )
·
DataPipeline/Nifi
1. Nifi Processor MergeContent MergeContent프로세서를 이용해 Input processor(ex.Kafka, File)에서 들어오는 데이터를적절한 크기로 Merge한 다음에 다음 단계로 넘겨주는 Flow를 기대했다.그러나 MergeContent는 이미 받은 데이터를 복제한 후 병합하는 형식으로,기대했던 결과를 가져오지는 못했다. 예를 들어 100의 데이터가 들어온다면 100개의 데이터를 차곡차곡 쌓았다가하나의 데이터로 병합하여 결국 1개의 데이터가 들어와야 하는것을 결과로 예상했지만,MergeContent는 100개의 데이터를 일단 Nifi 저장 후,그 데이터들을 병합한 데이터셋 복제복을 만들어 다음 프로세스로 넘겨준다.결국 병합된 1개의 데이터셋을 전달하지만, 이중 저장..
wave35
'DataPipeline' 카테고리의 글 목록 (9 Page)