1. 0.1Главная
  2. 0.2О нас
  3. 0.3Сервисы
  4. 0.4Кейсы
  5. 0.5Блог
  6. 0.6Контакт

Siemens Energy: событийно-управляемый конвейер данных с ИИ в AWS


Клиент: Siemens Energy · Подразделение данныхUnknown NodeОтрасль: Энергетика · ПроизводствоUnknown NodeУслуги: Облачная архитектура, бессерверная разработка, интеграция ИИ/МО, инфраструктура как код (IaC)Unknown NodeСервисы AWS: S3, S3 Access Grants, Lambda, Step Functions, EventBridge, API Gateway, Bedrock Data Automation, Amplify, SQS, CloudFormation (CDK на Python)

Unknown Node

Задача

Подразделение данных Siemens Energy управляет огромными объёмами данных измерений продукции, документами, изображениями, аудиозаписями и видеофайлами, генерируемыми в рамках глобальных производственных операций, на турбинах и другом оборудовании.

Команде была необходима система, способная:

  • Принимать файлы произвольного размера от килобайт до многогигабайтных наборов данных измерений и изображений высокого разрешения, без ограничений по размеру полезной нагрузки и без деградации производительности.
  • Автоматически извлекать богатые метаданные из каждого загруженного файла, независимо от модальности (текст, изображение, аудио, видео), с помощью ИИ. При этом, если определённые метаданные были известны заранее, должна была быть возможность объединить их с автоматически сгенерированными.
  • Обеспечивать контроль доступа на уровне разделов, чтобы различные команды и пользователи могли видеть и изменять данные только в пределах своей области, с интеграцией с существующим SSO-провайдером Microsoft Entra ID (Azure AD) компании Siemens Energy.
  • Масштабироваться до сотен тысяч файлов без ограничений управления жизненным циклом, характерных для таких инструментов, как OneDrive или SharePoint (потолок ~300 000 файлов).
  • Предоставлять веб-интерфейс для нетехнических пользователей для загрузки, просмотра, скачивания и предварительного просмотра файлов после аутентификации через корпоративный SSO.

Система должна была быть промышленного уровня, прошедшей нагрузочное тестирование и развёртываемой в нескольких средах (sandbox, development, UAT, production) с полной инфраструктурой как код (Infrastructure as Code, IaC).

Unknown Node

Решение

Мы спроектировали и реализовали Product Measurement Data Pipeline (PMDP), набор взаимосвязанных микросервисов и интерфейсов, полностью работающих в бессерверном режиме на AWS.

Обзор архитектуры

Платформа состоит из:

  1. File Manager API: RESTful API-сервис, обеспечивающий CRUD-операции с файлами, многочастную загрузку, генерацию предподписанных URL, операции с метаданными и контроль доступа на уровне разделов.
  2. Микросервис обогащения метаданных файлов: событийно-управляемый мультимодальный ИИ-конвейер, автоматически извлекающий структурированные метаданные из загруженных файлов с помощью Amazon Bedrock Data Automation.
  3. Веб-приложение: React-приложение, размещённое на AWS Amplify, предоставляющее интерфейс Storage Browser с интеграцией SSO Siemens Energy.
  4. Набор нагрузочных тестов: CLI-инструмент для валидации системы под нагрузкой с параллельными загрузками и скачиваниями файлов размером до нескольких гигабайт.

Вся инфраструктура описана в Python CDK, валидируется и автоматически развёртывается через CI/CD-конвейеры (self-hosted GitLab).

Unknown Node

Детальный разбор: как это работает

1. Партиционирование ключей объектов в стиле Hive

Каждый файл, загруженный через API, хранится в S3 с использованием схемы партиционирования в стиле Apache Hive. Это не просто организационная мера. Она обеспечивает высокопроизводительные запросы и напрямую связана с моделью контроля доступа.

# construct_object_key.py — Детерминированная генерация запрашиваемых ключей S3Unknown NodeUnknown Nodeclass Capability(str, Enum):Unknown Node EDAA = "edaa"Unknown Node CUSTOMER_FACING = "customer-facing"Unknown Node MANUFACTURING = "manufacturing"Unknown Node QUALITY = "quality"Unknown Node # ...Unknown NodeUnknown Nodedef construct_object_key(Unknown Node capability: Capability | str | None = None,Unknown Node file_name: str = "",Unknown Node sub_capability: str | None = None,Unknown Node year: str | None = None,Unknown Node month: str | None = None,Unknown Node day: str | None = None,Unknown Node) -> str:Unknown Node now = datetime.now()Unknown Node generated_hash = hashlib.md5(Unknown Node f"{capability}{sub_capability}{file_name}".encode()Unknown Node ).hexdigest()[:2]Unknown NodeUnknown Node return (Unknown Node f"capability={capability}/"Unknown Node f"sub-capability={sub_capability or 'unknown'}/"Unknown Node f"year={year or now.year}/"Unknown Node f"month={month or f'{now.month:02d}'}/"Unknown Node f"day={day or f'{now.day:02d}'}/"Unknown Node f"hash={generated_hash}/"Unknown Node f"{file_name}"Unknown Node )

Файл, загруженный как report.pdf с возможностью manufacturing 23 декабря 2025 года, получает следующий ключ:

capability=manufacturing/sub-capability=unknown/year=2025/month=12/day=23/hash=a3/report.pdf

Эта структура позволяет AWS Athena, AWS Glue или любому Hive-совместимому инструменту эффективно запрашивать озеро данных по возможности, диапазону дат или любой комбинации ключей разделов. Двухсимвольный хеш предотвращает «горячие точки» при интенсивной записи в S3.

2. S3 Access Grants с федерацией Entra ID

Вместо управления IAM-политиками для каждого пользователя мы реализовали S3 Access Grants, относительно новую функцию AWS, которая сопоставляет утверждения (claims) провайдера идентификации непосредственно с разрешениями на уровне префиксов S3.

Каждый пользователь или группа Entra ID сопоставляется с определённым разделом S3. При аутентификации пользователя утверждение oid из его JWT-токена используется для поиска соответствующей IAM-роли, которая затем принимается через sts:AssumeRoleWithWebIdentity. Роль ограничена разделом пользователя через Access Grant.

# access_grants.py — Изоляция разделов по пользователям через S3 Access GrantsUnknown NodeUnknown NodeCfnAccessGrant(Unknown Node self,Unknown Node f"AccessGrantUser{user_idx}",Unknown Node access_grants_location_id=user_location.ref,Unknown Node permission="READWRITE",Unknown Node grantee=CfnAccessGrant.GranteeProperty(Unknown Node grantee_identifier=user_role.role_arn,Unknown Node grantee_type="IAM",Unknown Node ),Unknown Node)

Lambda-интеграция затем обменивает токен Entra ID на ограниченные AWS-учётные данные при каждом запросе, с кешированием для избежания избыточных вызовов STS:

# get_cached_or_exchange_credentials.py — Обмен токенов с кешированиемUnknown NodeUnknown Nodedef get_cached_or_exchange_credentials(id_token: str) -> CredentialsTypeDef:Unknown Node key = _cache_key_from_token(id_token)Unknown Node now = time.time()Unknown Node with _CACHE_LOCK:Unknown Node entry = _CACHE.get(key)Unknown Node if entry and entry["expires_at"] > now + _SKEW_SECONDS:Unknown Node return entry["credentials"]Unknown NodeUnknown Node new_credentials = _exchange_and_assume_with_expiry(id_token)Unknown NodeUnknown Node with _CACHE_LOCK:Unknown Node _CACHE[key] = {Unknown Node "credentials": new_credentials,Unknown Node "expires_at": new_credentials["Expiration"].timestamp(),Unknown Node }Unknown Node return new_credentials

Это означает, что пользователь A в разделе manufacturing не может читать или записывать файлы в разделе quality пользователя B. Это обеспечивается на уровне S3, а не только на уровне приложения.

3. Мультимодальное обогащение метаданных с помощью ИИ

Когда файл попадает в S3, автоматически запускается событийно-управляемый конвейер. Система определяет тип файла, генерирует специализированный чертёж (blueprint) Bedrock Data Automation, запускает задачу извлечения и публикует структурированные результаты обратно в EventBridge.

Весь рабочий процесс оркестрируется Step Functions с использованием выражений JSONata:

# workflow.py — Оркестрация Step Functions с Bedrock Data AutomationUnknown NodeUnknown Nodedefinition_body = sfn.DefinitionBody.from_chainable(Unknown Node extract_event_dataUnknown Node .next(generate_blueprint) # Динамический чертёж на основе требований к метаданнымUnknown Node .next(start_data_automation_job) # Bedrock Data AutomationUnknown Node .next(wait_for_job_completion) # Асинхронное ожидание с task tokenUnknown Node .next(normalize_event_data) # Валидация вывода по схеме EventBridgeUnknown Node .next(publish_output_event) # Событие завершения в EventBridgeUnknown Node .next(sfn.Succeed(self, "FileMetadataEnrichmentCompletion"))Unknown Node)

Генератор чертежей динамически создаёт схемы извлечения на основе модальности файла и пользовательских требований к метаданным. Для изображений это означает обнаружение ограничивающих рамок и категоризация. Для документов — суммаризация и извлечение ключевых тезисов. Для аудио же это транскрипция и идентификация говорящих:

# bedrock_data_automation_blueprint_generator.pyUnknown NodeUnknown Nodedef generate_blueprint_schema(enrichments, blueprint_type):Unknown Node properties = {}Unknown NodeUnknown Node # Базовые свойства — извлекаются всегдаUnknown Node properties["data_classification"] = {Unknown Node "type": "string",Unknown Node "instruction": "The data classification level (public, internal, confidential, restricted)",Unknown Node }Unknown Node properties["summary"] = {Unknown Node "type": "string",Unknown Node "instruction": "A brief summary of the file content",Unknown Node }Unknown Node properties["keywords"] = {Unknown Node "type": "array",Unknown Node "items": {"type": "string"},Unknown Node "instruction": "Key terms and keywords extracted from the document",Unknown Node }Unknown NodeUnknown Node # Обогащения, специфичные для модальностиUnknown Node if enrichments.get("audio", {}).get("transcribe"):Unknown Node properties["transcript"] = {Unknown Node "type": "string",Unknown Node "instruction": "Full transcript of the audio content",Unknown Node }Unknown Node if enrichments.get("video", {}).get("scenes"):Unknown Node properties["scenes"] = {Unknown Node "type": "array",Unknown Node "items": {Unknown Node "type": "object",Unknown Node "properties": {Unknown Node "scene_number": {"type": "number"},Unknown Node "start_time": {"type": "number"},Unknown Node "end_time": {"type": "number"},Unknown Node "description": {"type": "string"},Unknown Node },Unknown Node },Unknown Node "instruction": "Scene changes detected in the video with timestamps",Unknown Node }Unknown NodeUnknown Node return {"class": f"{blueprint_type.capitalize()}Metadata", "properties": properties}

Обогащённые метаданные сохраняются рядом с исходным файлом в виде сопутствующего файла .metadata.json в том же Hive-партиционированном расположении, что делает их немедленно доступными для запросов.

4. Поддержка загрузки многогигабайтных файлов

API Gateway имеет ограничение полезной нагрузки в 10 МБ. Файлы измерений продукции могут весить гигабайты. Мы решили эту проблему с помощью многочастной загрузки на основе предподписанных URL, которая полностью обходит API Gateway для тяжёлых операций.

API рассчитывает оптимальные размеры частей, инициирует многочастную загрузку и возвращает предподписанные URL для каждой части. Клиент загружает данные напрямую в S3:

# multi_chunk.py — Оркестрация многочастной загрузки с предподписанными URLUnknown NodeUnknown Nodedef initiate_multi_chunk_upload_presigned(body):Unknown Node chunk_size, num_chunks = _calculate_chunk_size(request.fileSize)Unknown NodeUnknown Node response = s3.create_multipart_upload(Unknown Node Bucket=bucket, Key=key, ServerSideEncryption="aws:kms"Unknown Node )Unknown NodeUnknown Node presigned_urls = []Unknown Node for chunk_number in range(1, num_chunks + 1):Unknown Node presigned_url = s3.generate_presigned_url(Unknown Node "upload_part",Unknown Node Params={Unknown Node "Bucket": bucket, "Key": key,Unknown Node "UploadId": response["UploadId"],Unknown Node "PartNumber": chunk_number,Unknown Node },Unknown Node ExpiresIn=expires_in,Unknown Node )Unknown Node presigned_urls.append({Unknown Node "chunkNumber": chunk_number,Unknown Node "url": presigned_url,Unknown Node "startByte": (chunk_number - 1) * chunk_size,Unknown Node "endByte": min(chunk_number * chunk_size - 1, request.fileSize - 1),Unknown Node })Unknown NodeUnknown Node return {"uploadId": response["UploadId"], "chunks": presigned_urls}

На фронтенде веб-приложение обрабатывает это прозрачно. Маленькие файлы проходят через API, большие автоматически переключаются на многочастную загрузку:

// api.service.ts — Автоматический выбор стратегии загрузкиUnknown NodeUnknown Nodeasync upload(file: File, request: InitiateUploadRequest, onProgress?) {Unknown Node if (file.size > FILE_SIZE_THRESHOLD) {Unknown Node return this.multiChunkUploadService.uploadFile(file, request, { onProgress });Unknown Node }Unknown NodeUnknown Node const base64Content = await readFileAsBase64(file);Unknown Node return this.httpService.request('/files', 'POST', {Unknown Node body: { content: base64Content, fileName: request.fileName },Unknown Node });Unknown Node}

5. Веб-приложение

Фронтенд — это React-приложение, построенное на компоненте Storage Browser от AWS Amplify, настроенное с действиями, которые маршрутизируются через наш API, а не напрямую в S3. Это даёт нам полный контроль над контролем доступа, операциями с метаданными и стратегиями загрузки, обеспечивая при этом отполированный и привычный интерфейс управления файлами.

// storage-browser.provider.tsx — Кастомный Storage Browser с действиями через APIUnknown NodeUnknown Nodeconst { StorageBrowser } = createStorageBrowser({Unknown Node config: {Unknown Node registerAuthListener: async (onAuthStateChange) => {Unknown Node const authService = getAuthService();Unknown Node authService.registerAuthListener(onAuthStateChange);Unknown Node },Unknown Node listLocations: async ({ options }) => {Unknown Node return await apiService.getLocations({Unknown Node options: { pageSize: 30, nextToken: options?.nextToken },Unknown Node });Unknown Node },Unknown Node },Unknown Node actions: actionsBuilder.buildActions(),Unknown Node});

Пользователи входят в систему с учётными данными Entra ID Siemens Energy и сразу видят только те разделы, к которым у них есть доступ. Они могут загружать файлы любого размера, просматривать Hive-партиционированную структуру папок, предварительно просматривать документы и изображения, скачивать через предподписанные URL и редактировать метаданные. И всё, не покидая браузер.

6. Межаккаунтная событийно-управляемая архитектура

File Manager и микросервис обогащения метаданных работают в разных AWS-аккаунтах. Когда файл загружается, S3-бакет File Manager запускает Lambda, которая публикует событие FileMetadataEnrichmentRequest в межаккаунтную шину EventBridge.

# file_metadata_enrichment_processor.py — Межаккаунтная публикация событийUnknown NodeUnknown Nodedef put_event(bucket, key, size=None, etag=None, enrichments=None):Unknown Node detail = create_event_detail(bucket, key, size, etag, enrichments)Unknown NodeUnknown Node return events_client.put_events(Entries=[{Unknown Node "Source": "com.siemens-energy.pmdp.file-metadata-enrichment",Unknown Node "DetailType": "FileMetadataEnrichmentRequest",Unknown Node "Detail": json.dumps(detail),Unknown Node "EventBusName": EVENT_BUS_ARN, # Межаккаунтный ARNUnknown Node }])

На стороне обогащения правила EventBridge маршрутизируют события через SQS (с DLQ для отказоустойчивости) в EventBridge Pipe, который валидирует событие по реестру схем перед вызовом рабочего процесса Step Functions. События завершения и исключения пересылаются обратно в исходный аккаунт.

Эта развязанная архитектура означает, что микросервис обогащения может быть повторно использован любой командой в Siemens Energy. Им достаточно публиковать события в шину.

Unknown Node

Нагрузочное тестирование: доказательство масштабируемости

Система должна была обрабатывать огромные объёмы данных как на входе, так и на выходе. Некоторые из их файловых репозиториев могли занимать дни только для удаления. Мы создали специализированный CLI для нагрузочного тестирования, который генерирует файлы настраиваемых размеров (от 100 КБ до 5 ГБ+), загружает их параллельно, скачивает через предподписанные URL и проверяет целостность с помощью контрольных сумм MD5.

Результаты тестирования подтвердили:

  • Параллельные загрузки 20+ файлов одновременно, включая многогигабайтные файлы через многочастную загрузку
  • 100% успешность на сотнях файлов в рамках одного тестового прогона
  • Верификация скачивания с подтверждением побайтовой целостности после полного цикла через весь конвейер
  • Автоматическая очистка тестовых артефактов как из S3, так и из локального хранилища
Unknown Node

Инфраструктура как код: всё в CDK

Вся платформа, оба AWS-аккаунта, все сервисы, все IAM-роли, вся маршрутизация событий, описана в Python CDK. Конфигурация, специфичная для среды, управляется через Hydra/OmegaConf, что делает тривиальным создание новой среды или подключение новой команды.

# config.py — Типобезопасная конфигурация для каждой средыUnknown NodeUnknown Nodeconfig_environment = make_config(Unknown Node env=zf(cdk.Environment),Unknown Node environment=zf(Literal["sandbox", "dev", "uat", "prd"]),Unknown Node s3explorer=zf(config_s3explorer),Unknown Node access_grants=zf(config_access_grants),Unknown Node project_name=zf(str, default="mfg-product-measurement-data-pipeline"),Unknown Node file_metadata_enrichment_event_bus_arn=zf(str),Unknown Node)

CI/CD для веб-приложения использует федерацию GitLab OIDC. Никаких долгоживущих учётных данных, никаких секретов для ротации. CDK-стек создаёт OIDC-провайдер, роль для развёртывания и S3-бакет для исходного кода Amplify в одном конструкте.

Unknown Node

Результаты

  • Поддержка размера файлов: без ограничений (протестировано до 5 ГБ+)
  • Параллельность загрузки: 20+ одновременных загрузок
  • Извлечение метаданных: автоматическое для документов, изображений, аудио и видео
  • Контроль доступа: изоляция разделов по пользователям и группам через S3 Access Grants
  • Среды: 4 (sandbox, development, UAT, production) из единой кодовой базы CDK
  • Интеграция идентификации: Microsoft Entra ID SSO с федерацией OIDC
  • Инфраструктура: 100% инфраструктура как код (Python CDK)
Unknown Node

Технологический стек

  • Вычисления: AWS Lambda (Python 3.14, ARM64)
  • Оркестрация: AWS Step Functions (JSONata)
  • ИИ/МО: Amazon Bedrock Data Automation
  • Хранилище: Amazon S3 (Intelligent-Tiering, шифрование KMS, Transfer Acceleration)
  • API: Amazon API Gateway (REST) с Lambda Powertools + Swagger
  • События: Amazon EventBridge, EventBridge Pipes, SQS
  • Идентификация: Microsoft Entra ID, федерация OIDC, S3 Access Grants
  • Фронтенд: React, Vite, AWS Amplify, Amplify UI Storage Browser
  • IaC: AWS CDK (Python), Hydra/OmegaConf
  • CI/CD: GitLab CI
Unknown Node

Почему Tone Singleton

Этот проект потребовал глубокой экспертизы по всему стеку AWS от проектирования IAM-политик низкого уровня и межаккаунтной маршрутизации событий до интеграции передовой технологии Bedrock Data Automation и кастомизации Amplify Storage Browser.

Мы стремились выйти за рамки первоначальных спецификаций и рекомендовать самые современные инновации облака AWS. Результатом стала система, в которой загрузка файла запускает ИИ-конвейер, обогащающий его структурированными метаданными, сохраняющий в запрашиваемый раздел и делающий немедленно доступным через веб-интерфейс, и всё это без каких-либо действий пользователя, кроме перетаскивания файла.

Многим организациям необходимо создавать надёжные, современные облачные приложения на AWS — системы, которые справляются с реальными масштабами, интегрируются с корпоративными провайдерами идентификации и используют ИИ там, где это действительно важно. Вы одна из них? Давайте поговорим.


Больше статей

Nextnet: Научная ИИ платформа на AWS CDK

AWS CDK и грамотный выбор сервисов это высокая скорость разработки, развертывания в разных средах и интеграции ИИ при сохранении уровня безопасности.

вторник, 22 июля 2025 г.

Контакт

Канторы
Tone Singleton SPR BV
Rue Henri Werriestraat, 6 (box 7)
1090 Brussels
Belgium
Tone Singleton Ptd Ltd
60 Paya Lebar Road #06-28
Paya Lebar Square
409051 Singapore


  • LinkedIn
Условия использованияПолитика конфиденциальности