AWS 기술 블로그
AWS DataZone에서 OpenLineage 기반의 View 테이블 데이터 계보 그리기
배경
관계형 데이터베이스에서 View 테이블은 실제 데이터를 저장하지 않고, 기본 테이블의 데이터를 기반으로 한 가상의 테이블입니다. View 테이블의 데이터 계보는 데이터가 어떤 기본 테이블에서 비롯되었고, 어떤 과정을 거쳐 최종적으로 View 테이블에 도달했는지를 명확히 파악하는 데 필수적입니다. 이를 통해 데이터의 출처와 변환 과정을 명확히 이해하고, 데이터의 신뢰성을 보장할 수 있습니다. 또한, 데이터 계보는 View 테이블이 잘못된 기본 테이블이나 쿼리를 참조하고 있을 경우 이를 신속히 발견하여 수정할 수 있도록 돕습니다. 이는 데이터 정확성과 시스템 안정성을 유지하는 데 중요한 역할을 합니다.
View 테이블은 기본 테이블의 변경 사항에 민감합니다. 기본 테이블의 스키마나 데이터가 변경되면 해당 View 테이블도 영향을 받을 수 있습니다. 데이터 계보를 통해 이러한 변경이 View 테이블과 관련된 다른 시스템이나 프로세스에 어떤 영향을 미칠지 사전에 분석할 수 있습니다. 이를 통해 예기치 않은 오류를 예방하고 유지보수를 효율적으로 수행할 수 있습니다.
결국, View 테이블의 데이터 계보는 데이터의 신뢰성, 품질, 투명성, 그리고 시스템 안정성을 보장하며, 효율적인 데이터 관리와 비즈니스 의사결정에 중요한 역할을 합니다. 데이터 계보는 조직 내 데이터 흐름을 투명하게 보여주며, 데이터 거버넌스를 강화하여 데이터 사용 및 보안에 대한 정보에 입각한 결정을 내릴 수 있도록 돕습니다. 이를 통해 비즈니스 의사결정과 시스템 운영의 효율성을 높이는 데 크게 기여할 수 있습니다.
솔루션 개요
이번 포스팅에서는 HAQM DataZone의 데이터 계보 기능을 활용하여 관계형 데이터베이스에서 View 테이블에 대한 데이터 계보를 그리는 방법에 대해서 설명합니다. HAQM DataZone 서비스에서는 DataZone 라이프 사이클 내에서 발생한 게시와 구독 이벤트에 대한 데이터 계보만 표현되며, 그 외의 경우에는 사용자가 데이터 계보를 직접 작성해야 합니다. 이를 위해 AWS Lambda를 사용하여 View 테이블에 대한 데이터 계보를 OpenLineage 표준에 맞게 HAQM DataZone에 업데이트하는 방법에 대해서 설명합니다.
관계형 데이터베이스에서 View 테이블에 대한 데이터 계보를 그리면 아래와 같습니다. 이 그림을 보면 오른쪽 노드에 위치한 View Table은 왼쪽 Input Table 5개를 사용해서 만들어 진 것을 알 수 있습니다.
Input Table이나 Output Table에 대한 컬럼 정보나 Comment 정보를 알고 싶으면 각각의 노드에 있는 열 정보에서 아래와 같이 확인할 수 있습니다.
솔루션 아키텍처
전체적인 아키텍처는 Prodcution 계정과 Governance 계정으로 구성되며, 두 계정은 AWS Transit Gateway를 통해 연결되어 네트워크 통신이 가능하도록 설정됩니다. 먼저, Prodcution계정에서는 데이터의 저장 및 메타데이터 생성 작업이 이루어집니다. 관계형 데이터베이스(RDS)는 데이터를 저장하고, 사용자의 요청에 따라 뷰 테이블(View Tables)을 생성합니다.
이후 AWS Glue Crawler가 RDS 데이터를 스캔하여 메타데이터를 자동으로 생성하며, 이 메타데이터는 Glue Data Catalog에 저장됩니다. Governance계정에서는 View Table에 대한 데이터 계보 작성과 사용자 접근 제어가 이루어집니다. Lambda 함수는 Business Data Catalog에서 View Table 자산을 조회 한 후 조회한 자산 이름(View Table 이름)으로 관계형 데이터베이스(RDS)에 View Table 생성 시 사용한 쿼리문을 조회합니다. 이후 이를 이용해 Input Table과 Output Table 이름을 조회 한 후 이를 기반으로 View 테이블에 대한 데이터 계보를 작성하게 됩니다.
사용자는 Data Portal을 통해 이러한 메타데이터와 계보 정보를 검색하고 필요한 데이터 자산에 접근할 수 있습니다. Data Portal은 SSO(Single Sign-On) 기능을 제공하는 AWS IAM Identity Center와 연동되어 사용자 인증을 처리하며, 이를 통해 안전한 데이터 접근이 가능합니다. Governance계정 내에서는 Step Functions가 워크플로우를 관리하며, AWS Secrets Manager를 통해 민감한 정보를 안전하게 저장하고 관리합니다. 또한 Lambda 및 기타 서비스에서 발생하는 로그는 HAQM CloudWatch Logs에 기록되어 모니터링과 분석에 활용됩니다.
사전 준비 사항
다음과 같은 사항이 사전에 준비되어야 합니다.
- HAQM RDS for PostgreSQL에 스키마 및 테이블 생성되어 있어야 함
- HAQM DataZone에 Table과 View Table이 자산으로 등록되어 있어야 함
RDBMS View Table 계보 작성 단계 요약
- 단계 1: Data Portal 자산이 View Table인지 확인
- 단계 2: 데이터베이스에 View Table 생성 정의 쿼리
- 단계 3: OpenLineage SQL Parser을 이용해 Input/Output 테이블 분석
- 단계 4: 데이터 계보 작성 및 업데이트
단계 1: Data Portal 자산이 View Table인지 확인
HAQM DataZone에 자산이 등록되면 아래 그림과 같이 자산 유형에 따라 Glue Table이나 Glue View 유형으로 구분되어집니다. 이중 Glue View가 View Table 자산을 의미합니다.
AWS DataZone에서 등록 된 자산 정보를 조회하는 코드를 아래와 같이 사용합니다.
먼저 자산을 검색해서 자산 유형이 Glue View 인 자산 ID을 가져옵니다. Glue View 유형을 검색하기 위해서는 type_identifiers
값을 “amazon.datazone.GlueViewAssetType
”으로 조회하면 됩니다.
import boto3
import botocore
from botocore.exceptions import ClientError
# 자산 중Glue View 형식인(amazon.datazone.GlueViewAssetType) 자산만 가져오기
asset_ids = get_asset_ids(project_id, config.type_identifiers)
def get_asset_ids(project_id, type_identifiers):
"""
특정 프로젝트의 자산(asset) ID들을 조회하는 함수
Args:
project_id: DataZone 프로젝트 ID
type_identifiers: 필터링할 자산 유형 식별자 목록
Returns:
asset_ids: 조회된 자산 ID 목록
"""
# 조회된 자산 ID를 저장할 리스트
asset_ids = []
# Pagination을 위한 다음 토큰
next_token = None
try:
# 모든 페이지를 순회하며 자산 조회
while True:
# DataZone search API 호출을 위한 파라미터 설정
search_params = {
'domainIdentifier': config.domainId, # DataZone 도메인 ID
'owningProjectIdentifier': project_id, # 프로젝트 ID
'searchScope': 'ASSET', # 검색 범위를 자산으로 한정
'maxResults': 50 # 한 번에 가져올 최대 결과 수
}
# 다음 페이지가 있는 경우 토큰 추가
if next_token:
search_params['nextToken'] = next_token
# DataZone API 호출하여 자산 검색
response = datazone.search(**search_params)
# 검색된 각 항목에 대해 처리
for item in response.get('items', []):
asset_item = item.get('assetItem')
# 자산 항목이 존재하고, 필요한 필드가 있으며
# 지정된 자산 유형에 해당하는 경우에만 처리
if (
asset_item and
'identifier' in asset_item and
'typeIdentifier' in asset_item and
asset_item['typeIdentifier'] in type_identifiers
):
asset_ids.append(asset_item['identifier'])
# 다음 페이지 토큰 확인
next_token = response.get('nextToken')
# 더 이상 가져올 데이터가 없으면 종료
if not next_token:
break
except Exception as e:
# 오류 발생 시 로그 출력 후 예외 재발생
print(f"Error fetching asset IDs: {str(e)}")
raise
return asset_ids
단계 2: 데이터베이스에 View Table 생성 정의 쿼리
데이터 계보를 그리기 위해서는 View Table을 생성할 때 어떤 input table과 output table이 사용되었는지 알아야 합니다. 이를 위해서 “pg_get_viewdef” PostgreSQL 시스템 함수를 이용해 View Table 정의문을 요청합니다. 단계 1에서 얻어 온 자산 ID을 이용해 View Table 이름이 포함 된 상세 자산 정보를 아래와 같이 조회합니다.
import boto3
import botocore
from botocore.exceptions import ClientError
# 자산 ID을 이용해 자산 상세 정보를 조회
asset_info = get_asset(asset_id)
def get_asset(asset_id):
"""
Datazone에서 자산 상세 정보를 조회하는 함수입니다.
Parameters:
asset_id (str): 조회할 자산의 ID
Returns:
dict: 자산의 상세 정보가 포함된 딕셔너리
"""
asset = {} # 자산 정보를 저장할 빈 딕셔너리
# Datazone API를 통해 자산 정보를 조회합니다.
response = datazone.get_asset(
domainIdentifier=config.domainId,
identifier=asset_id
)
# 조회된 자산 정보를 딕셔너리에 저장합니다.
asset['formsOutput'] = response['formsOutput']
asset['id'] = response['id']
asset['name'] = response['name']
# 자산 설명이 있는 경우 저장합니다.
if 'description' in response:
asset['description'] = response['description']
# 자산 용어 사전에 대한 정보가 있는 경우 저장합니다.
if 'glossaryTerms' in response:
asset['glossaryTerms'] = response['glossaryTerms']
# 자산이 목록에 포함되어 있는지 여부를 저장합니다.
if 'listing' in response:
asset['listing'] = True
else:
asset['listing'] = False
# 외부 식별자가 있는 경우, AWS Glue ARN 패턴을 분석하여 데이터베이스, 스키마, 테이블 정보를 추출합니다.
if response.get('externalIdentifier'):
arn_pattern = re.compile(r"^arn:aws:glue:(?P<region>[^:\n]*):(?P<account>[^:\n]*):table/(?P<database>[^:\/\n]*)/(?P<table>[^:\/\n.]*).(?P<ignore>[^:\/\n]*)")
match = arn_pattern.match(response['externalIdentifier'])
if match:
match_path = match.group('table').split('_')
if len(match_path) >= 3:
asset['database'] = match_path[0]
asset['schema'] = match_path[1]
asset['table'] = '_'.join(match_path[2:])
return asset
얻어온 View Table 이름을 사용해 “pg_get_viewdef” PostgreSQL 시스템 함수를 아래와 같이 호출합니다.
# wms.v_wms_stock_out_r view 테이블 생성 정의 쿼리문 요청
SELECT pg_get_viewdef('wms.v_wms_stock_out_r', true);
위의 시스템 함수를 호출하면 아래와 같은 View Table 생성 정의문을 얻어올 수 있습니다.
CREATE OR REPLACE VIEW wms.v_wms_stock_out_r AS
SELECT
wms_stock.id AS stock_id,
wms_stock.product_code,
wms_stock.quantity,
wms_warehouse.warehouse_name,
wms_movement.movement_date,
wms_movement.movement_type,
wms_inventory.current_stock,
wms_supplier.supplier_name,
wms_location.storage_location
FROM
wms_stock
INNER JOIN
wms_warehouse ON wms_stock.warehouse_id = wms_warehouse.id
INNER JOIN
wms_movement ON wms_stock.movement_id = wms_movement.id
INNER JOIN
wms_inventory ON wms_stock.inventory_id = wms_inventory.id
INNER JOIN
wms_supplier ON wms_stock.supplier_id = wms_supplier.id
INNER JOIN
wms_location ON wms_stock.location_id = wms_location.id
WHERE
wms_movement.movement_type = 'OUT';
단계 3: OpenLineage SQL Parser을 이용해 Input/Output 테이블 분석
단계 2에서 얻어온 View Table 생성 정의문에서 Input Table 이름과 Output Table 이름을 얻어와야 합니다. 이를 위해서 OpenLineage SQL Parser 파이썬 패키지를 사용합니다.
# OpenLineage SQL Parser 파이썬 패키지 설치
pip install openlineage-sql
# OpenLineage SQL Parser을 이용해 Input/Output 테이블 분석 코드
import boto3
from openlineage_sql import parse
from psycopg2.sql import SQL, Identifier, Literal
from config.config import config
from view_table_lineage_utils import *
# view 테이블에 대한 정의를 요청하는 쿼리 요청
sql_query = """
SELECT pg_get_viewdef({}, true);
"""
view_table_definition = run_query(config.db_secret_name,
SQL(sql_query).format(
Literal(query_table_name)
)
)
# view 테이블에 대한 정의가 없는 경우 fail로 처리
if view_table_definition == None:
return 'fail'
else:
print(f"[INFO] {query_table_name} 테이블에 대한 계보 정보를 조회합니다.")
# 쿼리문 결과를 openlineage_sql을 이용해 input table과 output table을 구별한다.
sql = f"""
CREATE OR REPLACE VIEW '{query_table_name}'
AS
"""
# 쿼리 결과에서 정의 부분만 가져와 위 쿼리문과 합친다.
sql = sql + view_table_definition[0]['pg_get_viewdef']
# openlineage_sql을 이용해 파싱한다.
meta = parse([sql])
in_tables = meta.in_tables
out_table = meta.out_tables[0]
위 OpenLineage SQL Parser을 이용해 Input/Output 테이블 분석 코드를 실행하게 되면 아래와 같이 Input Table 이름과 Output Table 이름을 얻을 수 있습니다.
input table: [wms.wms_inventory, wms.wms_location, wms.wms_movement, wms.wms_stock, wms.wms_supplier, wms.wms_warehouse]
output table: [wms.v_wms_stock_out_r]
단계 4: 데이터 계보 작성 및 업데이트
HAQM DataZone에서는 OpenLineage 호환 이벤트를 사용하여 데이터 계보를 그릴 수 있습니다. HAQM DataZone 데이터 계보는 아래와 같이 표현 됩니다. 데이터 계보를 그릴 때 마다 Job이 만들어지게 되고 이 Job을 기준으로 Input node와 output node를 지정해야 합니다.
OpenLineage 이벤트 패킷 구성은 아래와 같습니다.
- eventTime: OpenLineage 이벤트가 발생한 시점을 나타내는 필드입니다. 이는 ISO 8601 형식의 타임스탬프로 기록되며, 이벤트가 생성된 정확한 시간을 포함합니다.
- eventType: 특정 이벤트의 유형을 나타냅니다. 예를 들어, 데이터 처리 작업이 시작되었는지, 완료되었는지, 실패했는지를 정의합니다. 주요 이벤트 유형에는 START, COMPLETE, FAIL, ABORT 등이 있으며, 이는 작업 실행의 상태를 나타냅니다.
- job: 데이터 계보 처리 작업을 정의하며, 고유한 이름과 네임스페이스로 식별됩니다.
- run: 특정 Job이 실행되는 인스턴스를 나타내며, 시작 및 완료 시간과 같은 정보를 포함합니다. 각 Run은 고유한 ID(UUID)로 식별되며, 이는 Job의 동적 실행을 추적하는 데 사용됩니다.
- Inputs/Outputs: Inputs는 작업 실행 중 사용된 데이터셋(소스)을 나타내며, Outputs는 작업 결과로 생성된 데이터셋(대상)을 나타냅니다. 각 데이터셋은 네임스페이스와 이름으로 식별되며, 데이터 흐름과 변환 과정을 추적하는 데 중요한 역할을 합니다
- facets: Job, Run, Dataset 등의 엔티티에 부가적인 메타데이터를 추가할 수 있는 확장 가능한 필드입니다. 특정 모델이나 프로세스를 더 세부적으로 표현하기 위해 사용되며, 커스터마이징이 가능합니다. 예를 들어, 데이터셋의 스키마 정보나 변환 방식 등을 포함할 수 있습니다.
AWS DataZone에서 Input 테이블과 Output 테이블로 등록 된 자산 정보를 조회하는 코드를 아래와 같이 사용합니다.
import boto3
import botocore
from botocore.exceptions import ClientError
# 리니지 노드의 Source Identifier를 사용하여 해당하는 자산(Asset)의 ID를 찾는 함수
def get_asset_id_by_source_identifier(source_identifier):
# source_identifier에서 첫번째 '/' 이후의 경로만 추출
name = '/'.join(source_identifier.split('/')[1:])
# 모든 프로젝트 목록을 가져옴
projects = list_projects()
for project in projects:
project_id = project['id']
# 각 프로젝트 내에서 name과 일치하는 자산을 검색
assets = search_asset_in_project(project_id, name)
if assets:
for asset in assets:
# 자산의 식별자(identifier) 추출
asset_id = asset['assetItem']['identifier']
# 자산의 상세 정보 조회
asset_info = get_asset(asset_id)
# AssetCommonDetailsForm 양식에서 정보 필터링
filtered_forms = [form for form in asset_info['formsOutput'] if form['formName'] == 'AssetCommonDetailsForm']
content = json.loads(filtered_forms[0]['content'])
# sourceIdentifier가 일치하는 경우 해당 asset_id 반환
if content.get('sourceIdentifier') == source_identifier:
return asset_id
# 일치하는 자산을 찾지 못한 경우 None 반환
return None
# 자산 ID를 사용하여 자산 정보와 컬럼 설명을 조회하는 함수
def get_asset_with_description(asset_id):
# HAQM DataZone API를 사용하여 자산 정보 조회
response = datazone.get_asset(
domainIdentifier=config.domainId,
identifier=asset_id
)
# Glue 관련 양식(GlueViewForm 또는 GlueTableForm) 찾기
glue_form_index = next((index for (index, d) in enumerate(response['formsOutput'])
if d['formName'] in ['GlueViewForm', 'GlueTableForm']), None)
glue_data = response['formsOutput'][glue_form_index]['content']
# 컬럼 비즈니스 메타데이터 양식 찾기
business_metadata_form_index = next((index for (index, d) in enumerate(response['formsOutput'])
if d['formName'] == 'ColumnBusinessMetadataForm'), None)
# 비즈니스 메타데이터 양식이 없는 경우
if business_metadata_form_index is None:
print("ColumnBusinessMetadataForm가 없습니다.")
data_dict = json.loads(glue_data)
return data_dict['columns']
# 비즈니스 메타데이터 정보 추출
business_metadata = response['formsOutput'][business_metadata_form_index]['content']
# Glue 데이터와 비즈니스 메타데이터를 결합하여 반환
return enhance_columns(glue_data, business_metadata)
위 OpenLineage 이벤트 패킷 구성을 기반으로 Inputs 테이블 정보를 아래와 같이 추가합니다.
# 입력 테이블들의 정보를 lineage_event_packet에 추가하는 로직
for input_table in input_tables:
# 소스 식별자 형식 변환
source_identifier = f"[{config.lineage_node_namespace}] odw.{input_table}"
# OpenLineage 형식에 맞는 입력 테이블 메타데이터 구성
input_name = {
"namespace": "postgresql", # 데이터베이스 타입
"name": source_identifier, # 테이블 식별자
"facets": { # OpenLineage 스키마 정보
"schema": {
"_producer": "http://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
"_schemaURL": "http://openlineage.io/spec/1-0-2/OpenLineage.json#/definitions/SchemaDatasetFacet",
"fields": [] # 컬럼 정보를 담을 빈 배열
}
}
}
# 소스 식별자를 이용하여 에셋 식별자 조회
asset_identifier = get_asset_id_by_source_identifier(f"{config.lineage_node_namespace}" + "/" + source_identifier)
# 자산이 포털에 등록되어 있지 않은 경우
if asset_identifier is None:
print(f"[INFO] {source_identifier} Asset이 포털에 등록되지 않았습니다. Default 값으로 node 정보를 업데이트 합니다.")
lineage_event_packet['inputs'].append(input_name)
# 자산이 포털에 등록되어 있는 경우
else:
# 자산의 상세 컬럼 정보 조회
asset_columns = get_asset_with_description(asset_identifier)
# OpenLineage 형식에 맞게 컬럼 속성명 변경
# dataType -> type, columnName -> name으로 변경
for column in asset_columns:
column['type'] = column.pop('dataType')
column['name'] = column.pop('columnName')
# 변환된 컬럼 정보를 스키마에 추가
input_name['facets']['schema']['fields'] = asset_columns
# 완성된 입력 테이블 정보를 lineage_event_packet에 추가
lineage_event_packet['inputs'].append(input_name)
위 OpenLineage 이벤트 패킷 구성을 기반으로 Outputs 테이블 정보를 아래와 같이 추가합니다.
# 출력 테이블 정보를 lineage_event_packet에 추가하는 로직
source_identifier = f"[{config.lineage_node_namespace}] odw.{output_table}"
# OpenLineage 형식에 맞는 출력 테이블 메타데이터 구성
output_name = {
"namespace": "postgresql", # 데이터베이스 타입
"name": source_identifier, # 테이블 식별자
"facets": { # OpenLineage 스키마 정보
"schema": {
"_producer": "http://github.com/OpenLineage/OpenLineage/tree/0.10.0/integration/airflow",
"_schemaURL": "http://openlineage.io/spec/1-0-2/OpenLineage.json#/definitions/SchemaDatasetFacet",
"fields": [] # 컬럼 정보를 담을 빈 배열
}
}
}
# 소스 식별자를 이용하여 에셋 식별자 조회
asset_identifier = get_asset_id_by_source_identifier(f"{config.lineage_node_namespace}" + "/" + source_identifier)
# 출력 테이블이 포털에 등록되어 있지 않은 경우
if asset_identifier is None:
print(f"[INFO] {source_identifier} output 노드가 포털에 등록되지 않았습니다. 자산에 대한 계보 표현을 Skip 합니다!")
return 'skip_log'
# 출력 테이블이 포털에 등록되어 있는 경우
else:
# 에셋의 상세 컬럼 정보 조회
asset_columns = get_asset_with_description(asset_identifier)
# OpenLineage 형식에 맞게 컬럼 속성명 변경
# dataType -> type, columnName -> name으로 변경
for column in asset_columns:
column['type'] = column.pop('dataType')
column['name'] = column.pop('columnName')
# 변환된 컬럼 정보를 스키마에 추가
output_name['facets']['schema']['fields'] = asset_columns
# 완성된 출력 테이블 정보를 lineage_event_packet에 추가
lineage_event_packet['outputs'].append(output_name)
OpenLineage 이벤트 패킷 구성이 완료되면 DataZone API을 이용해 데이터 계보를 업데이트 합니다.
try:
# DataZone API를 호출하여 계보 이벤트 포스팅
datazone.post_lineage_event(
domainIdentifier=config.domainId, # DataZone 도메인 ID
event=json.dumps(lineage_event_packet) # 계보 이벤트 데이터를 JSON 문자열로 변환
)
# 성공 시 로그 출력
# 출력 테이블의 namespace와 name을 포함하여 성공 메시지 표시
print(f"[SUCCESS] '{lineage_event_packet['outputs'][0]['namespace']}/{lineage_event_packet['outputs'][0]['name']}' 계보 이벤트가 성공적으로 포스팅되었습니다.")
except Exception as e:
# 오류 발생 시 로그 출력
# 실패한 출력 테이블 정보와 오류 내용을 포함하여 에러 메시지 표시
print(f"[ERROR] '{lineage_event_packet['outputs'][0]['namespace']}/{lineage_event_packet['outputs'][0]['name']}' 계보 이벤트 포스팅 중 오류 발생: {str(e)}")
작성된 데이터 계보를 확인합니다.
결론
이번 포스팅에서는 HAQM DataZone의 데이터 계보 기능을 활용하여 관계형 데이터베이스에서 View 테이블에 대한 데이터 계보를 그리는 방법에 대해서 알아보았습니다. 관계형 데이터베이스에서 View 테이블의 데이터 계보는 데이터 신뢰성과 시스템 안정성 유지에 필수적입니다. HAQM DataZone과 AWS Lambda를 활용한 데이터 계보 작성은 OpenLineage 표준 기반으로 데이터 출처, 변환 과정, 의존성을 명확히 시각화 할 수 있습니다. 이를 통해 기본 테이블 변경 시 영향 범위 분석이 가능해지며, 오류 발생 시 신속한 근본 원인 추적이 용이해질 수 있습니다. 다음 포스팅에서는 데이터 마트 구성에서 많이 사용하고 있는 Airflow에 대한 데이터 계보를 그리는 방법에 대해서 포스팅하도록 하겠습니다.