Azure AI Search 인스턴스 마이그레이션 자동화 시스템

Azure AI Search 인스턴스 마이그레이션 자동화 시스템

개요

이 프로젝트는 Azure AI Search 인스턴스의 스케일업이 필요한 상황에서 기존 인덱스를 새로운 인스턴스로 완전히 마이그레이션하는 자동화 시스템입니다. 기존 인스턴스의 인덱스 수 제한(50개)에 도달하여 스케일업이 필요했지만, 기존 인덱스를 업그레이드할 수 없어 새로운 인스턴스를 생성하고 모든 인덱스를 안전하게 마이그레이션해야 하는 상황을 해결합니다.

핵심 특징

  • 완전 자동화: 수동 개입 없이 모든 인덱스를 자동으로 마이그레이션
  • 안전한 마이그레이션: 기존 인덱스 구조와 데이터를 완벽하게 보존
  • 배치 처리: 대용량 데이터를 효율적으로 처리하기 위한 배치 업로드
  • 오류 처리: 마이그레이션 과정에서 발생할 수 있는 오류에 대한 안전한 처리
  • 진행 상황 모니터링: 실시간으로 마이그레이션 진행 상황을 추적

문제 해결

기존 문제점

  1. 인덱스 수 제한: 기존 인스턴스의 50개 인덱스 제한에 도달
  2. 업그레이드 불가: 기존 인덱스를 직접 업그레이드할 수 없는 Azure 제약
  3. 수동 마이그레이션: 수십 개의 인덱스를 수동으로 마이그레이션해야 하는 부담
  4. 데이터 손실 위험: 마이그레이션 과정에서 데이터 손실 가능성
  5. 다운타임: 마이그레이션 중 서비스 중단 위험

해결 효과

  • 무중단 마이그레이션: 서비스 중단 없이 안전한 마이그레이션 수행
  • 완전 자동화: 수동 작업을 완전히 제거하여 인적 오류 방지
  • 데이터 무결성: 모든 인덱스 구조와 데이터를 완벽하게 보존
  • 효율적인 처리: 배치 처리로 대용량 데이터를 효율적으로 마이그레이션
  • 진행 상황 추적: 실시간 모니터링으로 마이그레이션 상태 파악

구현 방식

1. 마이그레이션 시스템 아키텍처

클라이언트 초기화 및 설정

from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient

class AzureSearchMigrationManager:
    """Azure AI Search 인스턴스 마이그레이션을 관리하는 클래스"""
    
    def __init__(self, source_endpoint, source_key, target_endpoint, target_key):
        self.source_endpoint = source_endpoint
        self.source_key = source_key
        self.target_endpoint = target_endpoint
        self.target_key = target_key
        
        # 클라이언트 초기화
        self.source_index_client = SearchIndexClient(
            endpoint=source_endpoint, 
            credential=AzureKeyCredential(source_key)
        )
        self.target_index_client = SearchIndexClient(
            endpoint=target_endpoint, 
            credential=AzureKeyCredential(target_key)
        )
    
    def initialize_clients(self):
        """소스 및 대상 서비스의 클라이언트를 초기화하는 함수"""
        print("Azure AI Search 클라이언트 초기화 중...")
        
        # 소스 서비스 연결 테스트
        try:
            source_indexes = list(self.source_index_client.list_indexes())
            print(f"소스 서비스 연결 성공: {len(source_indexes)}개 인덱스 발견")
        except Exception as e:
            raise Exception(f"소스 서비스 연결 실패: {e}")
        
        # 대상 서비스 연결 테스트
        try:
            target_indexes = list(self.target_index_client.list_indexes())
            print(f"대상 서비스 연결 성공: {len(target_indexes)}개 인덱스 존재")
        except Exception as e:
            raise Exception(f"대상 서비스 연결 실패: {e}")
        
        return True

2. 인덱스 발견 및 분석 시스템

인덱스 목록 수집

def discover_source_indexes(self):
    """소스 서비스의 모든 인덱스를 발견하고 분석하는 함수"""
    print("소스 서비스에서 모든 인덱스를 가져오는 중...")
    
    source_indexes = self.source_index_client.list_indexes()
    index_info = []
    
    for source_index in source_indexes:
        index_name = source_index.name
        
        # 인덱스 상세 정보 수집
        try:
            source_client = SearchClient(
                endpoint=self.source_endpoint, 
                index_name=index_name, 
                credential=AzureKeyCredential(self.source_key)
            )
            
            # 문서 수 확인
            document_count = source_client.get_document_count()
            
            index_info.append({
                'name': index_name,
                'fields_count': len(source_index.fields),
                'document_count': document_count,
                'has_vector_search': source_index.vector_search is not None,
                'has_semantic_search': source_index.semantic_search is not None
            })
            
            print(f"인덱스 '{index_name}': {document_count}개 문서, {len(source_index.fields)}개 필드")
            
        except Exception as e:
            print(f"인덱스 '{index_name}' 정보 수집 실패: {e}")
            index_info.append({
                'name': index_name,
                'error': str(e)
            })
    
    return index_info

def analyze_migration_requirements(self, index_info):
    """마이그레이션 요구사항을 분석하는 함수"""
    total_documents = sum(info.get('document_count', 0) for info in index_info if 'document_count' in info)
    total_indexes = len(index_info)
    
    print(f"\n=== 마이그레이션 요구사항 분석 ===")
    print(f"총 인덱스 수: {total_indexes}개")
    print(f"총 문서 수: {total_documents:,}개")
    print(f"벡터 검색 인덱스: {sum(1 for info in index_info if info.get('has_vector_search'))}개")
    print(f"시맨틱 검색 인덱스: {sum(1 for info in index_info if info.get('has_semantic_search'))}개")
    
    # 예상 마이그레이션 시간 계산 (문서당 0.1초 가정)
    estimated_time_minutes = (total_documents * 0.1) / 60
    print(f"예상 마이그레이션 시간: {estimated_time_minutes:.1f}분")
    
    return {
        'total_indexes': total_indexes,
        'total_documents': total_documents,
        'estimated_time_minutes': estimated_time_minutes
    }

3. 안전한 인덱스 생성 시스템

인덱스 존재 여부 확인 및 생성

def create_target_index_safely(self, source_index):
    """대상 인덱스를 안전하게 생성하는 함수"""
    source_index_name = source_index.name
    
    # 대상 인덱스가 이미 존재하는지 확인
    try:
        existing_index = self.target_index_client.get_index(source_index_name)
        print(f"대상 인덱스 '{source_index_name}'가 이미 존재합니다. 건너뜁니다.")
        return False, existing_index
    except Exception:
        # 인덱스가 존재하지 않을 경우에만 생성
        pass
    
    # 대상 인덱스 생성
    print(f"대상 인덱스 '{source_index_name}' 생성 중...")
    try:
        # 소스 인덱스의 모든 설정을 그대로 복사
        target_index = self.target_index_client.create_index(source_index)
        print(f"대상 인덱스 '{source_index_name}' 생성 완료.")
        
        # 생성된 인덱스 정보 출력
        self._print_index_info(target_index)
        
        return True, target_index
        
    except Exception as e:
        print(f"대상 인덱스 생성 중 오류 발생: {e}")
        return False, None

def _print_index_info(self, index):
    """인덱스 정보를 출력하는 함수"""
    print(f"  - 필드 수: {len(index.fields)}개")
    print(f"  - 벡터 검색: {'있음' if index.vector_search else '없음'}")
    print(f"  - 시맨틱 검색: {'있음' if index.semantic_search else '없음'}")
    print(f"  - 스코어링 프로파일: {len(index.scoring_profiles) if index.scoring_profiles else 0}개")

4. 효율적인 데이터 마이그레이션 시스템

배치 처리 기반 데이터 마이그레이션

def migrate_index_data(self, source_index_name, batch_size=1000):
    """인덱스 데이터를 배치 처리로 마이그레이션하는 함수"""
    print(f"인덱스 '{source_index_name}'의 데이터 마이그레이션 시작...")
    
    # 클라이언트 생성
    source_client = SearchClient(
        endpoint=self.source_endpoint, 
        index_name=source_index_name, 
        credential=AzureKeyCredential(self.source_key)
    )
    target_client = SearchClient(
        endpoint=self.target_endpoint, 
        index_name=source_index_name, 
        credential=AzureKeyCredential(self.target_key)
    )
    
    documents = []
    total_documents = 0
    batch_count = 0
    
    try:
        # 소스 인덱스에서 모든 문서 가져오기
        print(f"  소스 인덱스에서 문서 수집 중...")
        results = source_client.search(search_text="*", include_total_count=True)
        
        for result in results:
            documents.append(result)
            total_documents += 1
            
            # 배치 크기에 도달하면 업로드
            if len(documents) >= batch_size:
                batch_count += 1
                success = self._upload_batch(target_client, documents, batch_count)
                if not success:
                    return False
                documents = []  # 배치 초기화
        
        # 마지막 배치 처리
        if documents:
            batch_count += 1
            success = self._upload_batch(target_client, documents, batch_count)
            if not success:
                return False
        
        print(f"인덱스 '{source_index_name}' 데이터 마이그레이션 완료. (총 {total_documents:,}개 문서)")
        return True
        
    except Exception as e:
        print(f"인덱스 '{source_index_name}' 데이터 마이그레이션 중 오류 발생: {e}")
        return False

def _upload_batch(self, target_client, documents, batch_number):
    """문서 배치를 업로드하는 함수"""
    try:
        upload_result = target_client.upload_documents(documents=documents)
        
        # 업로드 결과 확인
        failed_documents = [r for r in upload_result if not r.succeeded]
        if failed_documents:
            print(f"배치 {batch_number} 업로드 중 오류 발생:")
            for failed in failed_documents:
                print(f"  - 문서 ID: {failed.key}, 오류: {failed.error_message}")
            return False
        else:
            print(f"배치 {batch_number} 업로드 성공. ({len(documents)}개 문서)")
            return True
            
    except Exception as e:
        print(f"배치 {batch_number} 업로드 중 예외 발생: {e}")
        return False

5. 완전 자동화된 마이그레이션 파이프라인

통합 마이그레이션 실행

def execute_full_migration(self, batch_size=1000, skip_existing=True):
    """전체 마이그레이션을 실행하는 메인 함수"""
    print("=== Azure AI Search 인스턴스 마이그레이션 시작 ===")
    
    # 1. 클라이언트 초기화
    self.initialize_clients()
    
    # 2. 소스 인덱스 발견
    index_info = self.discover_source_indexes()
    
    # 3. 마이그레이션 요구사항 분석
    requirements = self.analyze_migration_requirements(index_info)
    
    # 4. 마이그레이션 실행
    migration_results = []
    successful_migrations = 0
    failed_migrations = 0
    
    print(f"\n=== 마이그레이션 실행 시작 ===")
    
    for index_info_item in index_info:
        if 'error' in index_info_item:
            print(f"인덱스 '{index_info_item['name']}' 건너뜀 (오류: {index_info_item['error']})")
            failed_migrations += 1
            continue
        
        index_name = index_info_item['name']
        
        try:
            # 소스 인덱스 정보 가져오기
            source_index = self.source_index_client.get_index(index_name)
            
            # 대상 인덱스 생성
            created, target_index = self.create_target_index_safely(source_index)
            
            if not created and skip_existing:
                print(f"인덱스 '{index_name}' 건너뜀 (이미 존재)")
                continue
            
            # 데이터 마이그레이션
            migration_success = self.migrate_index_data(index_name, batch_size)
            
            if migration_success:
                successful_migrations += 1
                migration_results.append({
                    'index_name': index_name,
                    'status': 'success',
                    'document_count': index_info_item.get('document_count', 0)
                })
            else:
                failed_migrations += 1
                migration_results.append({
                    'index_name': index_name,
                    'status': 'failed'
                })
                
        except Exception as e:
            print(f"인덱스 '{index_name}' 처리 중 오류 발생: {e}")
            failed_migrations += 1
            migration_results.append({
                'index_name': index_name,
                'status': 'error',
                'error': str(e)
            })
    
    # 5. 마이그레이션 결과 요약
    self._print_migration_summary(migration_results, successful_migrations, failed_migrations)
    
    return migration_results

def _print_migration_summary(self, results, successful, failed):
    """마이그레이션 결과 요약을 출력하는 함수"""
    print(f"\n=== 마이그레이션 완료 요약 ===")
    print(f"총 처리된 인덱스: {len(results)}개")
    print(f"성공: {successful}개")
    print(f"실패: {failed}개")
    print(f"성공률: {(successful / len(results) * 100):.1f}%")
    
    # 성공한 인덱스들의 총 문서 수
    total_migrated_documents = sum(
        result.get('document_count', 0) 
        for result in results 
        if result['status'] == 'success'
    )
    print(f"마이그레이션된 총 문서 수: {total_migrated_documents:,}개")
    
    # 실패한 인덱스 목록
    failed_indexes = [result['index_name'] for result in results if result['status'] != 'success']
    if failed_indexes:
        print(f"\n실패한 인덱스:")
        for index_name in failed_indexes:
            print(f"  - {index_name}")

6. 사용 예시 및 실행

마이그레이션 실행

# 마이그레이션 매니저 초기화
migration_manager = AzureSearchMigrationManager(
    source_endpoint="https://<source-service>.search.windows.net",
    source_key="<source-admin-key>",
    target_endpoint="https://<target-service>.search.windows.net",
    target_key="<target-admin-key>"
)

# 전체 마이그레이션 실행
results = migration_manager.execute_full_migration(
    batch_size=1000,  # 배치 크기
    skip_existing=True  # 기존 인덱스 건너뛰기
)

# 결과 확인
for result in results:
    if result['status'] == 'success':
        print(f"✅ {result['index_name']}: {result['document_count']:,}개 문서 마이그레이션 완료")
    else:
        print(f"❌ {result['index_name']}: 마이그레이션 실패")

리팩토링 제안

1. 고급 오류 처리 및 복구 시스템

재시도 메커니즘과 오류 복구

import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class MigrationConfig:
    """마이그레이션 설정을 관리하는 데이터 클래스"""
    batch_size: int = 1000
    max_retries: int = 3
    retry_delay: float = 1.0
    skip_existing: bool = True
    validate_after_migration: bool = True
    backup_before_migration: bool = False

class AdvancedMigrationManager(AzureSearchMigrationManager):
    """고급 오류 처리와 복구 기능을 가진 마이그레이션 매니저"""
    
    def __init__(self, source_endpoint, source_key, target_endpoint, target_key, config: MigrationConfig):
        super().__init__(source_endpoint, source_key, target_endpoint, target_key)
        self.config = config
        self.migration_log = []
        self.failed_migrations = []
    
    def migrate_index_with_retry(self, source_index_name: str) -> bool:
        """재시도 메커니즘을 사용하여 인덱스를 마이그레이션하는 함수"""
        for attempt in range(self.config.max_retries):
            try:
                print(f"마이그레이션 시도 {attempt + 1}/{self.config.max_retries}: {source_index_name}")
                
                success = self.migrate_index_data(source_index_name, self.config.batch_size)
                
                if success:
                    if self.config.validate_after_migration:
                        validation_success = self._validate_migration(source_index_name)
                        if not validation_success:
                            print(f"마이그레이션 검증 실패: {source_index_name}")
                            if attempt < self.config.max_retries - 1:
                                time.sleep(self.config.retry_delay * (2 ** attempt))  # 지수 백오프
                                continue
                            return False
                    
                    print(f"마이그레이션 성공: {source_index_name}")
                    return True
                else:
                    if attempt < self.config.max_retries - 1:
                        print(f"마이그레이션 실패, 재시도 대기 중... ({self.config.retry_delay * (2 ** attempt)}초)")
                        time.sleep(self.config.retry_delay * (2 ** attempt))
                    
            except Exception as e:
                print(f"마이그레이션 시도 {attempt + 1} 중 오류: {e}")
                if attempt < self.config.max_retries - 1:
                    time.sleep(self.config.retry_delay * (2 ** attempt))
        
        # 모든 재시도 실패
        self.failed_migrations.append({
            'index_name': source_index_name,
            'attempts': self.config.max_retries,
            'last_error': str(e) if 'e' in locals() else 'Unknown error'
        })
        return False
    
    def _validate_migration(self, index_name: str) -> bool:
        """마이그레이션 후 데이터 무결성을 검증하는 함수"""
        try:
            source_client = SearchClient(
                endpoint=self.source_endpoint, 
                index_name=index_name, 
                credential=AzureKeyCredential(self.source_key)
            )
            target_client = SearchClient(
                endpoint=self.target_endpoint, 
                index_name=index_name, 
                credential=AzureKeyCredential(self.target_key)
            )
            
            # 문서 수 비교
            source_count = source_client.get_document_count()
            target_count = target_client.get_document_count()
            
            if source_count != target_count:
                print(f"문서 수 불일치: 소스 {source_count}, 대상 {target_count}")
                return False
            
            # 샘플 문서 비교 (처음 10개)
            source_docs = list(source_client.search(search_text="*", top=10))
            target_docs = list(target_client.search(search_text="*", top=10))
            
            for i, (source_doc, target_doc) in enumerate(zip(source_docs, target_docs)):
                if source_doc.get('id') != target_doc.get('id'):
                    print(f"문서 ID 불일치 (위치 {i}): 소스 {source_doc.get('id')}, 대상 {target_doc.get('id')}")
                    return False
            
            print(f"마이그레이션 검증 성공: {index_name}")
            return True
            
        except Exception as e:
            print(f"마이그레이션 검증 중 오류: {e}")
            return False

2. 병렬 처리 및 성능 최적화

멀티스레딩을 활용한 병렬 마이그레이션

import concurrent.futures
from threading import Lock
import queue

class ParallelMigrationManager(AdvancedMigrationManager):
    """병렬 처리를 지원하는 마이그레이션 매니저"""
    
    def __init__(self, source_endpoint, source_key, target_endpoint, target_key, config: MigrationConfig, max_workers: int = 3):
        super().__init__(source_endpoint, source_key, target_endpoint, target_key, config)
        self.max_workers = max_workers
        self.progress_lock = Lock()
        self.completed_count = 0
        self.total_count = 0
    
    def execute_parallel_migration(self) -> List[Dict[str, Any]]:
        """병렬 처리를 사용하여 마이그레이션을 실행하는 함수"""
        print(f"=== 병렬 마이그레이션 시작 (최대 {self.max_workers}개 워커) ===")
        
        # 소스 인덱스 발견
        index_info = self.discover_source_indexes()
        self.total_count = len(index_info)
        
        # 병렬 마이그레이션 실행
        migration_results = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 각 인덱스에 대해 마이그레이션 작업 제출
            future_to_index = {
                executor.submit(self._migrate_single_index_parallel, index_item): index_item
                for index_item in index_info
            }
            
            # 완료된 작업 처리
            for future in concurrent.futures.as_completed(future_to_index):
                index_item = future_to_index[future]
                try:
                    result = future.result()
                    migration_results.append(result)
                    
                    with self.progress_lock:
                        self.completed_count += 1
                        progress = (self.completed_count / self.total_count) * 100
                        print(f"진행률: {self.completed_count}/{self.total_count} ({progress:.1f}%) - {index_item['name']}")
                        
                except Exception as e:
                    print(f"인덱스 '{index_item['name']}' 처리 중 예외: {e}")
                    migration_results.append({
                        'index_name': index_item['name'],
                        'status': 'error',
                        'error': str(e)
                    })
        
        # 결과 요약
        self._print_migration_summary(migration_results, 
                                    sum(1 for r in migration_results if r['status'] == 'success'),
                                    sum(1 for r in migration_results if r['status'] != 'success'))
        
        return migration_results
    
    def _migrate_single_index_parallel(self, index_item: Dict[str, Any]) -> Dict[str, Any]:
        """단일 인덱스를 병렬로 마이그레이션하는 함수"""
        index_name = index_item['name']
        
        if 'error' in index_item:
            return {
                'index_name': index_name,
                'status': 'skipped',
                'reason': f"정보 수집 오류: {index_item['error']}"
            }
        
        try:
            # 소스 인덱스 정보 가져오기
            source_index = self.source_index_client.get_index(index_name)
            
            # 대상 인덱스 생성
            created, target_index = self.create_target_index_safely(source_index)
            
            if not created and self.config.skip_existing:
                return {
                    'index_name': index_name,
                    'status': 'skipped',
                    'reason': '이미 존재하는 인덱스'
                }
            
            # 데이터 마이그레이션
            migration_success = self.migrate_index_with_retry(index_name)
            
            if migration_success:
                return {
                    'index_name': index_name,
                    'status': 'success',
                    'document_count': index_item.get('document_count', 0)
                }
            else:
                return {
                    'index_name': index_name,
                    'status': 'failed',
                    'reason': '마이그레이션 실패'
                }
                
        except Exception as e:
            return {
                'index_name': index_name,
                'status': 'error',
                'error': str(e)
            }

3. 모니터링 및 로깅 시스템

상세한 로깅 및 모니터링

import logging
import json
from datetime import datetime
from typing import Dict, Any

class MigrationLogger:
    """마이그레이션 과정을 로깅하는 클래스"""
    
    def __init__(self, log_file: str = "migration.log"):
        self.logger = logging.getLogger('AzureSearchMigration')
        self.logger.setLevel(logging.INFO)
        
        # 파일 핸들러
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.INFO)
        
        # 콘솔 핸들러
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # 포맷터
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
        
        self.migration_start_time = None
        self.migration_stats = {
            'total_indexes': 0,
            'successful_migrations': 0,
            'failed_migrations': 0,
            'total_documents': 0,
            'total_batches': 0
        }
    
    def start_migration(self, total_indexes: int):
        """마이그레이션 시작을 로깅하는 함수"""
        self.migration_start_time = datetime.now()
        self.migration_stats['total_indexes'] = total_indexes
        
        self.logger.info(f"마이그레이션 시작: {total_indexes}개 인덱스")
        self.logger.info(f"시작 시간: {self.migration_start_time}")
    
    def log_index_migration_start(self, index_name: str, document_count: int):
        """인덱스 마이그레이션 시작을 로깅하는 함수"""
        self.logger.info(f"인덱스 마이그레이션 시작: {index_name} ({document_count:,}개 문서)")
    
    def log_batch_upload(self, index_name: str, batch_number: int, batch_size: int):
        """배치 업로드를 로깅하는 함수"""
        self.migration_stats['total_batches'] += 1
        self.logger.info(f"배치 업로드 완료: {index_name} - 배치 {batch_number} ({batch_size}개 문서)")
    
    def log_index_migration_complete(self, index_name: str, success: bool, document_count: int = 0):
        """인덱스 마이그레이션 완료를 로깅하는 함수"""
        if success:
            self.migration_stats['successful_migrations'] += 1
            self.migration_stats['total_documents'] += document_count
            self.logger.info(f"인덱스 마이그레이션 성공: {index_name} ({document_count:,}개 문서)")
        else:
            self.migration_stats['failed_migrations'] += 1
            self.logger.error(f"인덱스 마이그레이션 실패: {index_name}")
    
    def log_migration_error(self, index_name: str, error: str):
        """마이그레이션 오류를 로깅하는 함수"""
        self.logger.error(f"마이그레이션 오류 - {index_name}: {error}")
    
    def end_migration(self):
        """마이그레이션 완료를 로깅하는 함수"""
        end_time = datetime.now()
        duration = end_time - self.migration_start_time if self.migration_start_time else None
        
        self.logger.info("=== 마이그레이션 완료 ===")
        self.logger.info(f"완료 시간: {end_time}")
        if duration:
            self.logger.info(f"총 소요 시간: {duration}")
        
        # 통계 정보 로깅
        stats = self.migration_stats
        self.logger.info(f"총 인덱스: {stats['total_indexes']}개")
        self.logger.info(f"성공: {stats['successful_migrations']}개")
        self.logger.info(f"실패: {stats['failed_migrations']}개")
        self.logger.info(f"총 문서: {stats['total_documents']:,}개")
        self.logger.info(f"총 배치: {stats['total_batches']}개")
        
        if stats['total_indexes'] > 0:
            success_rate = (stats['successful_migrations'] / stats['total_indexes']) * 100
            self.logger.info(f"성공률: {success_rate:.1f}%")
    
    def save_migration_report(self, results: List[Dict[str, Any]], report_file: str = "migration_report.json"):
        """마이그레이션 보고서를 JSON 파일로 저장하는 함수"""
        report = {
            'migration_summary': {
                'start_time': self.migration_start_time.isoformat() if self.migration_start_time else None,
                'end_time': datetime.now().isoformat(),
                'duration_minutes': (datetime.now() - self.migration_start_time).total_seconds() / 60 if self.migration_start_time else None,
                'statistics': self.migration_stats
            },
            'detailed_results': results
        }
        
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        self.logger.info(f"마이그레이션 보고서 저장: {report_file}")

4. 웹 대시보드 및 API 서비스

FastAPI 기반 마이그레이션 관리 API

from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
import uvicorn

app = FastAPI(title="Azure AI Search Migration Manager API")

class MigrationRequest(BaseModel):
    source_endpoint: str
    source_key: str
    target_endpoint: str
    target_key: str
    batch_size: int = 1000
    max_workers: int = 3
    skip_existing: bool = True

class MigrationStatus(BaseModel):
    migration_id: str
    status: str  # running, completed, failed
    progress: float
    current_index: Optional[str]
    completed_indexes: int
    total_indexes: int
    start_time: str
    estimated_completion: Optional[str]

# 전역 상태 관리
migration_statuses: Dict[str, MigrationStatus] = {}

@app.post("/migration/start")
async def start_migration(request: MigrationRequest, background_tasks: BackgroundTasks):
    """마이그레이션을 시작하는 엔드포인트"""
    import uuid
    
    migration_id = str(uuid.uuid4())
    
    # 마이그레이션 상태 초기화
    migration_statuses[migration_id] = MigrationStatus(
        migration_id=migration_id,
        status="running",
        progress=0.0,
        current_index=None,
        completed_indexes=0,
        total_indexes=0,
        start_time=datetime.now().isoformat(),
        estimated_completion=None
    )
    
    # 백그라운드에서 마이그레이션 실행
    background_tasks.add_task(
        execute_migration_background,
        migration_id,
        request
    )
    
    return {"migration_id": migration_id, "status": "started"}

@app.get("/migration/{migration_id}/status")
async def get_migration_status(migration_id: str):
    """마이그레이션 상태를 조회하는 엔드포인트"""
    if migration_id not in migration_statuses:
        raise HTTPException(status_code=404, detail="Migration not found")
    
    return migration_statuses[migration_id]

@app.get("/migration/{migration_id}/results")
async def get_migration_results(migration_id: str):
    """마이그레이션 결과를 조회하는 엔드포인트"""
    if migration_id not in migration_statuses:
        raise HTTPException(status_code=404, detail="Migration not found")
    
    status = migration_statuses[migration_id]
    if status.status != "completed":
        raise HTTPException(status_code=400, detail="Migration not completed yet")
    
    # 결과 파일에서 데이터 로드 (실제 구현에서는 데이터베이스 사용)
    try:
        with open(f"migration_{migration_id}_results.json", "r") as f:
            results = json.load(f)
        return results
    except FileNotFoundError:
        raise HTTPException(status_code=404, detail="Results not found")

@app.get("/migration/list")
async def list_migrations():
    """모든 마이그레이션 목록을 조회하는 엔드포인트"""
    return list(migration_statuses.values())

async def execute_migration_background(migration_id: str, request: MigrationRequest):
    """백그라운드에서 마이그레이션을 실행하는 함수"""
    try:
        # 마이그레이션 매니저 초기화
        config = MigrationConfig(
            batch_size=request.batch_size,
            skip_existing=request.skip_existing
        )
        
        migration_manager = ParallelMigrationManager(
            source_endpoint=request.source_endpoint,
            source_key=request.source_key,
            target_endpoint=request.target_endpoint,
            target_key=request.target_key,
            config=config,
            max_workers=request.max_workers
        )
        
        # 마이그레이션 실행
        results = migration_manager.execute_parallel_migration()
        
        # 상태 업데이트
        migration_statuses[migration_id].status = "completed"
        migration_statuses[migration_id].progress = 100.0
        migration_statuses[migration_id].completed_indexes = len(results)
        
        # 결과 저장
        with open(f"migration_{migration_id}_results.json", "w") as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
            
    except Exception as e:
        migration_statuses[migration_id].status = "failed"
        print(f"마이그레이션 실패: {e}")

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

학습 내용

기술적 인사이트

1. 대규모 데이터 마이그레이션의 복잡성

  • 배치 처리의 중요성: 대용량 데이터를 효율적으로 처리하기 위한 배치 단위 처리
  • 메모리 관리: 전체 데이터를 메모리에 로드하지 않고 스트리밍 방식으로 처리
  • 오류 처리: 부분적 실패에 대한 안전한 처리 및 복구 메커니즘

2. Azure AI Search 인덱스 구조의 복잡성

  • 벡터 검색 설정: HNSW 알고리즘과 벡터 프로파일의 정확한 복제
  • 시맨틱 검색 설정: 제목, 키워드, 내용 필드의 우선순위 설정 보존
  • 스코어링 프로파일: 검색 결과 순위에 영향을 주는 설정의 완전한 이전

3. 자동화의 가치

  • 인적 오류 방지: 수동 작업에서 발생할 수 있는 실수를 완전히 제거
  • 일관성 보장: 모든 인덱스에 대해 동일한 방식으로 마이그레이션 수행
  • 확장성: 새로운 인덱스가 추가되어도 동일한 프로세스로 처리 가능

비즈니스 가치

1. 비용 절감 효과

  • 다운타임 최소화: 서비스 중단 없이 마이그레이션 수행으로 비즈니스 손실 방지
  • 인력 비용 절약: 수동 마이그레이션에 필요한 대량의 인력 투입 불필요
  • 오류 수정 비용 절약: 자동화를 통한 오류 방지로 사후 수정 비용 절약

2. 운영 효율성 증대

  • 예측 가능한 마이그레이션: 자동화된 프로세스로 마이그레이션 시간 예측 가능
  • 모니터링 강화: 실시간 진행 상황 추적으로 문제 조기 발견
  • 표준화된 프로세스: 일관된 마이그레이션 프로세스로 운영 표준화

3. 확장성과 유연성

  • 스케일 대응: 인덱스 수 증가에 대한 자동 대응
  • 환경 이전: 개발/운영 환경 간의 안전한 인덱스 이전
  • 백업 및 복구: 정기적인 백업을 통한 데이터 보호

향후 발전 방향

1. 고도화된 마이그레이션 전략

  • 점진적 마이그레이션: 서비스 중단 없이 점진적으로 트래픽 이전
  • 실시간 동기화: 소스와 대상 인덱스 간의 실시간 데이터 동기화
  • 롤백 기능: 문제 발생 시 이전 상태로 안전하게 롤백

2. 지능형 모니터링 시스템

  • 성능 모니터링: 마이그레이션 후 성능 지표 자동 비교
  • 데이터 품질 검증: 마이그레이션된 데이터의 품질 자동 검증
  • 알림 시스템: 문제 발생 시 즉시 알림 및 대응 방안 제시

3. 통합 플랫폼 구축

  • 웹 대시보드: 시각적 마이그레이션 관리 인터페이스
  • API 생태계: 다양한 도구와의 연동을 위한 RESTful API
  • CI/CD 통합: 개발 파이프라인과의 자동 통합

이 Azure AI Search 인스턴스 마이그레이션 자동화 시스템을 통해 대규모 검색 서비스의 안전하고 효율적인 인프라 이전을 실현할 수 있습니다. 특히 완전 자동화된 프로세스와 배치 처리 기반의 효율적인 데이터 마이그레이션을 통해 비즈니스 연속성을 보장하면서도 운영 효율성을 크게 향상시킨 혁신적인 솔루션입니다.