HAQM Neptune은 AWS에서 제공하는 완전관리형 그래프 데이터베이스 서비스입니다. 이 서비스는 고도로 연결된 데이터셋을 효율적으로 저장하고 쿼리할 수 있도록 설계되었으며, 주로 소셜 네트워킹, 추천 엔진, 지식 그래프, 생명과학 연구 등의 분야에서 활용됩니다. Neptune은 업계 표준인 Property Graph와 RDF(Resource Description Framework)를 모두 지원하며, 각각 Apache TinkerPop Gremlin과 OpenCypher 그리고 SPARQL 쿼리 언어를 통해 데이터에 접근할 수 있습니다.
HAQM Neptune DB와 Neptune Analytics

Neptune Analytics는 HAQM Neptune의 확장 기능으로, 그래프 데이터에 대한 고급 분석 기능을 제공합니다. 이 서비스는 대규모 그래프 데이터셋에 대한 배치 분석을 수행할 수 있으며, 그래프 알고리즘을 실행하여 복잡한 패턴을 발견하고 인사이트를 도출할 수 있습니다. 특히 중심성 분석, 커뮤니티 감지, 경로 찾기 등의 고급 그래프 알고리즘을 효율적으로 실행할 수 있는 환경을 제공합니다.
실제 비즈니스 환경에서는 두 서비스를 함께 활용하여 더욱 강력한 그래프 기반 솔루션을 구축할 수 있습니다. 예를 들어, 소셜 네트워크 분석에서 Neptune은 사용자 간의 관계 데이터를 저장하고 실시간 쿼리를 처리하는 데 사용되며, Neptune Analytics는 영향력 있는 사용자 식별이나 커뮤니티 패턴 분석과 같은 복잡한 분석 작업을 수행하는 데 활용됩니다.
데이터 동기화 방안
HAQM Neptune과 Neptune Analytics 간의 효율적인 데이터 동기화는 그래프 데이터베이스의 분석 성능을 최적화하는 데 핵심적인 요소입니다. 본 블로그에서는 두 서비스 간의 데이터 동기화 방안에 대해 간략히 설명하고 이 중 스트림 기반 동기화에 대해 상세히 설명합니다.

HAQM Neptune 데이터베이스에서 Neptune Analytics로의 배치 데이터 동기화는 Neptune Export 서비스를 활용하여 Neptune 데이터베이스 내 데이터를 CSV/JSON 형식으로 추출한 후, HAQM S3를 중간 저장소로 활용하고, Neptune Analytics의 벌크 로드 기능을 통해 S3 내 데이터를 Bulk Load 를 통해 Neptune Analytics 로 적재하는 방식으로 구현할 수 있습니다.
HAQM Neptune 데이터베이스의 변경 사항을 Neptune Analytics로 준 실시간 동기화하기 위해서는 데이터베이스 내 변경 데이터를 지속적으로 외부와 공유할 수 있도록 Neptune Streams를 활성화하고, AWS Lambda 함수와 기타 필요한 서비스와의 연계를 위해 IAM 역할을 적절하게 구성하여 스트림 레코드를 Neptune Analytics 엔드포인트로 주기적으로 전달하는 파이프라인을 EventBridge 를 통해 구현합니다. 또한 마지막으로 성공한 데이터 동기화 작업의 EventID 를 S3 에 저장하고 이를 이용해 이후 동기화 작업의 중복을 피하고 효율적인 동기화 작업을 수행할 수 있도록 합니다.
그럼 이제 스트림 동기화 방안에 대해 예제 코드와 함께 자세하게 살펴보도록 하겠습니다.
스트림 동기화 방안의 상세 절차
Neptune Stream 활성화
Neptune Database 내 데이터의 변경사항을 캡처할 수 있도록 Neptune Stream을 활성화합니다. Neptune Stream 을 활성화하는 방법은 다음과 같습니다:
-
- Neptune Database 의 Cluster Parameter 를 사용하여 neptune_streams 파라미터를 1로 설정합니다.
- Database 클러스터의 모든 인스턴스를 재부팅하여 파리미터 변경 사항을 해당 인스턴스에 적용합니다.
- 스트림이 활성화된 후 소스 DB 클러스터에서 최소한 하나의 추가 또는 삭제 작업을 수행하여 변경 스트림에 데이터 포인트를 채웁니다.
아래의 형식표와 같이 여러 필드로 나누어진 스트림 데이터를 통해 데이터의 변경 사항 ( ADD, REMOVE) 을 인식하고 공유할 수 있습니다.
필드 이름 |
설명 |
가능한 값 |
lastEventId |
마지막 변경 이벤트의 ID |
{ "commitNum": 숫자, "opNum": 숫자 } |
lastTrxTimestamp |
마지막 트랜잭션 커밋 시간 |
Unix epoch 기준 밀리초 |
format |
직렬화 형식 |
PG_JSON |
records |
변경 기록 배열 |
배열 형태로 여러 레코드 포함 |
totalRecords |
응답에 포함된 총 레코드 수 |
숫자 |
records.commitTimestamp |
트랜잭션 커밋 시간 |
Unix epoch 기준 밀리초 |
records.eventId |
변경 이벤트 ID |
{ "commitNum": 숫자, "opNum": 숫자 } |
records.data |
직렬화된 변경 데이터 |
Gremlin, SPARQL, OpenCypher 데이터 |
op |
변경 연산 |
ADD , REMOVE 등 |
isLastOp |
트랜잭션의 마지막 연산 여부 |
true 또는 존재하지 않음 |
records.data.id |
요소 ID |
문자열 |
records.data.type |
요소 유형 |
예: vl (Vertex Label) |
records.data.key |
속성 키 |
문자열 |
records.data.value |
속성 값 |
{ "value": 값, "dataType": 데이터 타입 } |
Neptune Analytics 설정
- Neptune Analytics 그래프를 생성하기 위해 관리 콘솔을 통해 손쉽게 생성할 수있으며, 이는 해당 그래프에 접속할 수 있는 단일 엔드포인트를 제공합니다.
- 생성된 Neptune Analytics 그래프 내 그래프 식별자를 기록하고 이를 이후 Lambda 함수에서 활용합니다.
S3 Bucket 생성
- 주기적으로 수행되는 동기화 작업에서 이전에 성공한 작업 이후부터 중복을 방지하고 보다 효울적으로 동기화 작업을 수행하기 위해 이전에 성공적으로 수행된 동기화 작업의 EventID(Checkpoint) 를 저장하기 위한 S3 bucket 을 생성합니다.
- 생성된 bucket 이름을 기록해 둡니다.
Lambda 함수 구현
Neptune Streams의 변경사항을 읽고 이를 기반으로 이전에 성공한 동기화 작업의 EventID 를 이용해 그 이후로 동기화 작업이 진행될 수 있도록 체크하고 Neptune Analytics로 OpenCypher 를 이용해 데이터를 적용할 수 있도록 변환하는 Lambda 함수를 생성합니다.
import json
import boto3
import logging
import time
from botocore.exceptions import ClientError
from typing import Dict, List, Optional
from botocore.config import Config
#로깅 구성
logger = logging.getLogger()
logger.setLevel(logging.INFO)
#AWS 클라이언트 초기화
session = boto3.Session()
class NeptuneStreamProcessor:
def __init__(self):
#소스, 분석 및 S3를 위해 Neptune 클라이언트 초기화
self.neptune_client = boto3.client('neptunedata', region_name=[리전 이름], endpoint_url=[NEPTUNE STREAM ENDPOINT URL])
self.analytics_client = boto3.client('neptune-graph', region_name=[리전 이름])
self.s3_client = boto3.client('s3', region_name=[리전 이름], config=Config(connect_timeout=5, read_timeout=10,retries={'max_attempts': 3}))
#엔드포인트 구성
self.source_endpoint = [NEPTUNE DATABASE ENDPOINT]
self.analytics_endpoint = [NEPTUNE ANALYTICS ENDPOINT]
#S3 체크포인트 구성
self.checkpoint_bucket = 'neptune-stream-checkpoints'
self.checkpoint_key = 'last-processed-position.json'
def get_last_checkpoint(self) -> Optional[Dict]:
#S3에서 마지막으로 처리된 레코드 위치를 가져옴
try:
response = self.s3_client.get_object(
Bucket=self.checkpoint_bucket,
Key=self.checkpoint_key
)
checkpoint = json.loads(response['Body'].read().decode('utf-8'))
logger.info(f"Retrieved checkpoint: {checkpoint}")
return checkpoint
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey' or e.response['Error']['Code'] == '404':
logger.info("No previous checkpoint found, starting from beginning")
return None
else:
logger.error(f"Error retrieving checkpoint: {str(e)}")
return None
def save_checkpoint(self, record: Dict) -> None:
#마지막으로 처리된 레코드 위치를 S3에 저장
try:
#각 레코드에서 commitNum 및 opNum을 직접 추출
eventid=record.get('eventId', [])
checkpoint = {
'commitNum': eventid.get('commitNum'),
'opNum': eventid.get('opNum'),
#commitNum:opNum 형식으로 eventId를 저장
'eventId': f"{eventid.get('commitNum')}:{eventid.get('opNum')}",
'timestamp': record.get('commitTimestampInMillis')
}
self.s3_client.put_object(
Bucket=self.checkpoint_bucket,
Key=self.checkpoint_key,
Body=json.dumps(checkpoint),
ContentType='application/json'
)
logger.info(f"Saved checkpoint: {checkpoint}")
except ClientError as e:
logger.error(f"Error saving checkpoint: {str(e)}")
return None
def ensure_checkpoint_bucket_exists(self) -> None:
#체크포인트 버킷이 존재하는지 확인하고, 존재하지 않으면 생성
logger.info(f"point 1")
try:
logger.info(self.checkpoint_bucket)
self.s3_client.head_bucket(Bucket=self.checkpoint_bucket)
logger.info(f"Checkpoint bucket {self.checkpoint_bucket} exists")
except ClientError as e:
if e.response['Error']['Code'] == '404' or e.response['Error']['Code'] == 'NoSuchBucket':
logger.info(f"Creating checkpoint bucket {self.checkpoint_bucket}")
try:
#적절한 구성으로 버킷을 생성
self.s3_client.create_bucket(
Bucket=self.checkpoint_bucket,
CreateBucketConfiguration={'LocationConstraint': 'us-west-2'}
)
#내구성을 높이기 위해 버전 관리를 활성화
self.s3_client.put_bucket_versioning(
Bucket=self.checkpoint_bucket,
VersioningConfiguration={'Status': 'Enabled'}
)
except ClientError as ce:
logger.error(f"Failed to create checkpoint bucket: {str(ce)}")
else:
logger.error(f"Error checking checkpoint bucket: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error accessing S3: {str(e)}")
return None
def get_stream_records(self) -> List[Dict]:
#가능한 경우 마지막 체크포인트부터 시작하여 Neptune 스트림에서 레코드를 가져옴
try:
#읽기/쓰기를 시도하기 전에 버킷이 있는지 확인
self.ensure_checkpoint_bucket_exists()
#마지막 체크포인트를 가져옴
checkpoint = self.get_last_checkpoint()
if checkpoint and 'eventId' in checkpoint:
event_id = checkpoint.get('eventId')
#eventID에서 commitNum 및 opNum 추출
if isinstance(event_id, str) and ':' in event_id:
parts = event_id.split(':')
if len(parts) == 2:
try:
#문자열을 정수로 변환
commit_num = int(parts[0])
op_num = int(parts[1])
except ValueError:
#변환 실패 시 예외 처리
logger.info(f"Error: Could not convert parts of event_id '{event_id}' to integers")
#올바른 매개변수를 사용
logger.info(f"Resuming stream from commit {commit_num}, op {op_num}")
response = self.neptune_client.get_propertygraph_stream(
limit=100,
iteratorType='AFTER_SEQUENCE_NUMBER',
commitNum=commit_num,
opNum=op_num
)
else:
#EventID 형식이 예상과 다른 경우
logger.warning(f"Invalid eventId format: {checkpoint.get('eventId')}, starting from beginning")
response = self.neptune_client.get_propertygraph_stream(
limit=100,
iteratorType='TRIM_HORIZON'
)
else:
#체크포인트가 없으면 처음부터 시작
logger.info("Starting stream from the beginning (TRIM_HORIZON)")
response = self.neptune_client.get_propertygraph_stream(
limit=100,
iteratorType='TRIM_HORIZON'
)
records = response.get('records', [])
logger.info(f"Retrieved {len(records)} records from stream")
return records
except ClientError as e:
logger.error(f"Error fetching stream records: {str(e)}")
raise
def transform_to_cypher(self, record: Dict) -> str:
#Neptune 스트림 레코드를 OpenCypher 쿼리로 변환
try:
op_type = record['op']
data = record['data']
value = data.get('value', {})
#레코드 유형을 기반으로 Cypher 쿼리를 생성
if op_type == 'ADD':
if data.get('type') == 'vl':
query = f"""CREATE (n:{value.get('value')} {{`~id`: '{data.get('id')}'}}) """
return query
elif data.get('type') == 'vp':
query = f"""MATCH (n) WHERE id(n) = '{data.get('id')}' SET n.{data.get('key')} = '{value.get('value')}' """
return query
elif data.get('type') == 'e':
query = f"""MATCH (n), (m) WHERE id(n) = '{data.get('from')}' AND id(m) = '{data.get('to')}' CREATE (n)-[r:{value.get('value')}]->(m) """
return query
elif data.get('type') == 'ep':
query = f"""MATCH (n)-[r]->(m) WHERE id(n) = '{data.get('id')}' SET r.{data.get('key')} = '{value.get('value')}' """
return query
elif op_type == 'REMOVE':
if data.get('type') == 'vl':
query = f"""MATCH (n) WHERE id(n) = '{data.get('id')}' DETACH DELETE n """
return query
elif data.get('type') == 'vp':
query = f"""MATCH (n) WHERE id(n) = '{data.get('id')}' REMOVE n.{data.get('key')} """
return query
elif data.get('type') == 'e':
query = f"""MATCH (n)-[r]->(m) WHERE id(r) = '{data.get('id')}' DELETE r """
return query
elif data.get('type') == 'ep':
query = f"""MATCH (n)-[r]->(m) WHERE id(r) = '{data.get('id')}' REMOVE r.{data.get('key')} """
return query
except Exception as e:
logger.error(f"Error translating record {record}: {str(e)}", exc_info=True)
return None
def execute_cypher_query(self, query: str):
#Neptune Analytics에서 OpenCypher 쿼리 실행
try:
response = self.analytics_client.execute_query(
queryString=query,
graphIdentifier='g-jz3p4msxz7',
language='OPEN_CYPHER'
)
return response
except ClientError as e:
logger.error(f"Error executing Cypher query: {str(e)}")
raise
def lambda_handler(event, context):
#증분 처리를 포함한 주요 Lambda 핸들러 함수
try:
processor = NeptuneStreamProcessor()
logger.info("Initializing Neptune Stream Processor")
#Neptune 스트림에서 레코드 가져오기(체크포인트 처리 포함)
records = processor.get_stream_records()
success_count = 0
error_count = 0
last_processed_record = None
logger.info("cpmplete get_stream_records")
#각 레코드를 처리
for record in records:
try:
#Cypher 쿼리로 변환
logger.info(f"Processing record: {record.get('eventId')}")
cypher_query = processor.transform_to_cypher(record)
if cypher_query:
#Neptune Analytics에서 쿼리 실행
#레코드를 성공적으로 처리한 경우에만 체크포인트를 저장
processor.execute_cypher_query(cypher_query)
success_count += 1
last_processed_record = record
if last_processed_record:
processor.save_checkpoint(last_processed_record)
except Exception as e:
error_count += 1
logger.error(f"Error processing record: {str(e)}")
continue
logger.info(f"Stream processing completed: {success_count} successful, {error_count} failed")
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Stream processing completed',
'successful_records': success_count,
'failed_records': error_count
})
}
except Exception as e:
logger.error(f"Lambda execution failed: {str(e)}", exc_info=True)
return {
'statusCode': 500,
'body': json.dumps({
'message': f'Error: {str(e)}'
})
}
해당 Lambda 함수에서는 다음 작업을 순차적으로 수행하도록 구현합니다:
- 우선 소스가 되는 Neptune Database의 변경 데이터를 캡처한 Neptune Stream에서 변경 레코드를 읽습니다.
- S3에 저장되어 있는 이전에 성공적으로 수행된 작업의 ID(EventID) 를 이용해 그 이후의 데이터만을 동기화할 수 있도록 체크합니다.
- Neptune Stream 에서 읽은 변경 사항을 Neptune Analytics 에 적용할 수 있도록 해당 형식(OpenCypher)으로 변환합니다.
- Neptune Analytics API를 사용하여 변경 사항을 Neptune Analytics 에 적용합니다.
- 마지막 성공 EventID 를 S3 에 저장한 후, 다음의 변경 사항을 적용할 때 해당 EventID 이용해 중복 처리를 방지하고 마지막 적용 시점 이후의 변경에 대해 수행할 수 있도록 합니다.
또한 Lambda 함수의 원할한 수행을 위해선 관련 서비스와의 설정 및 실행 역할에 대한 다음 권한이 필요합니다:
오류 처리 및 복원력 구현 (옵션)
- Lambda 함수에 재시도 로직을 구현하여 일시적인 오류를 처리합니다.
- Dead Letter Queue (DLQ)를 설정하여 처리하지 못한 레코드를 저장합니다.
모니터링 및 로깅
- CloudWatch를 사용하여 Lambda 함수와 Neptune Stream의 성능을 모니터링합니다.
- CloudWatch log 를 통해 동기화 프로세스를 추적하고 문제를 진단합니다. 다음은 해당 Lambda 함수의 실행 로그를 CloudWatch log 를 통해 확인한 예입니다. 해당 로그를 통해 스트림 내 데이터를 확인할 수 있습니다.

지속적인 동기화
- Lambda 함수가 지속적으로 Neptune Stream을 폴링하고 변경 사항을 처리할 수 있도록 EventBridge 를 통해 스케줄링 하고 수행 간격을 1분으로 설정합니다.
- 해당 lambda 함수를 실행할 수 있는 권한이 있는 역할을 EventBridge 스케줄에 부여합니다.
이 방식을 통해 Neptune Database의 변경 사항을 준 실시간으로 Neptune Analytics에 반영할 수 있으며, 분석 작업을 위한 최신 데이터를 항상 유지할 수 있습니다.
다음과 같이 Neptune Workbench (Jupyter Notebook) 를 통해 Neptune Database 에서의 데이터 변경사항이 1분 이내에 Neptune Analytics 에 동기화되는 것을 학인할 수 있습니다.

결론
HAQM Neptune 데이터베이스와 Neptune Analytics 간의 스트림 데이터 동기화는 Change Data Capture(CDC) 기반의 준 실시간 복제 메커니즘을 통해 메인 데이터베이스의 변경사항을 지속적으로 분석 엔진으로 전달함으로써, 그래프 데이터의 실시간 분석, 이상 탐지, 패턴 인식 등의 고급 분석 작업을 수행할 수 있게 하며, 이는 특히 금융 거래 모니터링, 소셜 네트워크 분석, IoT 데이터 처리와 같은 시간에 민감한 비즈니스 시나리오에서 데이터의 일관성을 유지하면서도 신속한 의사결정을 가능하게 하는 핵심적인 기술 솔루션으로 보다 경쟁력이 있는 데이터 기반 서비스를 구현할 수 있습니다.