apache beam와 word count 예제
apache beam이라는 오픈소스에 대하여 알아보기 위해서 간단한 예제인 word count example을 돌려 보았다.
주어진 예제에서 설명한것 처럼 아래와 같이 인풋 파일에는 일반 아무런 문장을 적고 아래 한줄을 실행하면 된다.
결과로는 파일이 리턴 되는데 counts-00000-of-00001 이런식으로 이름이 되어 있다. 파일의 크기가 크면 나뉘는듯 하다. 파일에는 문장에 있는 단어의 빈도가 각각 count 되어 있다. 해당 예제는 이전에 hadoop의 예제인 word count와 완전 동일하다. 차이점이라면 하둡 예제는 로컬에 hdfs 노드를 설정해야하고 여러 불편한 점이 있었는데 해당 예제는 오픈소스 자체에 내장이 되어 있어서 매우 간단하다.
wordcount_minimal.py의 중간에 아래와 같은 코드가 있다.
'|'는 'or' 인 bitwise operator 라 위의 코드 스타일과 약간 안맞다. 생각해보니 ocaml에서 matching 하는 연산을 배운 기억이 있다. 그런데 아무리 찾아도 python이 해당 연산을 지원하지는 않는 것 같았다. stackoverflow 글을 보니 apache beam에서 해당 연산을 override 하는듯 하다.
word count example
예제를 돌리는 것은 매우 간단하다.
주어진 예제에서 설명한것 처럼 아래와 같이 인풋 파일에는 일반 아무런 문장을 적고 아래 한줄을 실행하면 된다.
python -m apache_beam.examples.wordcount_minimal --input YOUR_INPUT_FILE --output counts
결과로는 파일이 리턴 되는데 counts-00000-of-00001 이런식으로 이름이 되어 있다. 파일의 크기가 크면 나뉘는듯 하다. 파일에는 문장에 있는 단어의 빈도가 각각 count 되어 있다. 해당 예제는 이전에 hadoop의 예제인 word count와 완전 동일하다. 차이점이라면 하둡 예제는 로컬에 hdfs 노드를 설정해야하고 여러 불편한 점이 있었는데 해당 예제는 오픈소스 자체에 내장이 되어 있어서 매우 간단하다.
word count example Code
wordcount_minimal.py 예제 코드는 길지 않아서 이해하기 매우 어려운 편은 아니다. 간단히 보면 input의 args들을 parsing한 후 파일을 통해서 pipeline을 생성 한다. 각 워드마다 카운트를 한 후 리턴을 하면 끝이다.
궁금증 1 : apache beam sdk method
wordcount_minimal.py의 중간에 아래와 같은 코드가 있다.
lines = p | ReadFromText(known_args.input) counts = ( lines | 'Split' >> ( beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)). with_output_types(unicode)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
'|'는 'or' 인 bitwise operator 라 위의 코드 스타일과 약간 안맞다. 생각해보니 ocaml에서 matching 하는 연산을 배운 기억이 있다. 그런데 아무리 찾아도 python이 해당 연산을 지원하지는 않는 것 같았다. stackoverflow 글을 보니 apache beam에서 해당 연산을 override 하는듯 하다.
궁금증 2 : 무슨 엔진이 해당 연산을 진행 하는지.
Apache Beam이라는 것은 Spark, Flink, Dataflow와 같은 분산처리 시스템에서의 파이프라인을 정의 하는 프로그래밍 모델이라고 되어 있다. 이는 이자체가 데이터를 처리하는게 아니라 다른애가 처리하도록 하는 장치라는 것이다.
나는 위의 예제를 간단하게 돌리기만 하였는데 누가 이를 돌리는지가 갑자기 궁금해졌다.
wordcount_minimal.py 파일을 다시 보면 아마 여기서 사용할것 같은 DirectRunner라는 부분이 있다. 주석을 보면 테스트나 로직을 검증하기 위한 small scale을 돌리는데에 적합하다고 작성되어 있다. 프로젝트 구조를 보면 DirectRunner이외에 Spark Flink등의 코드가 있는 것으로 보아서 동일한 연산을 지원하기 위한 코드인것으로 추측이 된다.
댓글
댓글 쓰기