1. 0.1Startpagina
  2. 0.2Over ons
  3. 0.3Diensten
  4. 0.4Casestudies
  5. 0.5Blog
  6. 0.6Contact

Siemens Energy: event-driven, AI-aangedreven datapipeline op AWS


Siemens Energy casestudie heroafbelding

Klant: Siemens Energy · Data-afdeling
Sector: Energie · Productie
Diensten: Cloudarchitectuur, serverloze ontwikkeling, AI/ML-integratie, Infrastructure as Code (IaC)
AWS-services: S3, S3 Access Grants, Lambda, Step Functions, EventBridge, API Gateway, Bedrock Data Automation, Amplify, SQS, CloudFormation (CDK in Python)


De uitdaging

De data-afdeling van Siemens Energy beheert enorme hoeveelheden productmeetdata, zoals documenten, afbeeldingen, audio-opnames en videobestanden die gegenereerd zijn door wereldwijde productieactiviteiten, turbines en andere installaties en apparatuur.

Het team had een systeem nodig dat:

  • Bestanden van willekeurige grootte kon verwerken, van kilobytes tot multi-gigabyte meetdatasets en beelden met hoge resolutie zonder payloadlimieten te bereiken of prestaties te verminderen.
  • Automatisch rijke metadata kon extraheren uit elk geüpload bestand, ongeacht de modaliteit (tekst, afbeelding, audio, video), met behulp van AI. Wanneer bepaalde metadata vooraf bekend waren, moest het mogelijk zijn deze samen te voegen met de automatisch gegenereerde metadata.
  • Toegangscontrole op partitieniveau kon afdwingen, zodat verschillende teams en gebruikers alleen data binnen hun aangewezen bereik konden zien en wijzigen, geïntegreerd met de bestaande Microsoft Entra ID (Azure AD) SSO-identiteitsprovider van Siemens Energy.
  • Kon opschalen naar honderdduizenden bestanden zonder de beperkingen van het levenscyclusbeheer van tools zoals OneDrive of SharePoint (~300.000 bestandenplafond).
  • Een webgebaseerde interface kon bieden voor niet-technische gebruikers om bestanden te uploaden, bladeren, downloaden en bekijken na authenticatie met bedrijfsbrede SSO.

Het systeem moest productierijp zijn, stresstests doorstaan en uitrolbaar zijn in meerdere omgevingen (sandbox, development, UAT, productie) met volledige Infrastructure as Code (IaC).


De oplossing

We hebben de Product Measurement Data Pipeline (PMDP) ontworpen en opgeleverd, een suite van onderling verbonden microservices en interfaces, volledig serverloos draaiend op AWS.

Architectuuroverzicht

Het platform bestaat uit:

  1. File Manager API: een RESTful API-dienst voor CRUD-operaties op bestanden, multi-chunk uploads, generatie van vooraf ondertekende URL’s, metadata-operaties en partitiegebaseerde toegangscontrole.
  2. Microservice voor metadataverrijking: een event-driven, multimodale AI-pipeline die automatisch gestructureerde metadata extraheert uit geüploade bestanden met Amazon Bedrock Data Automation.
  3. Webapplicatie: een op AWS Amplify gehoste React-applicatie met een Storage Browser UI en Siemens Energy SSO-integratie.
  4. Stresstestsuite: een CLI-middel voor het valideren van het systeem onder belasting met gelijktijdige uploads en downloads van bestanden tot meerdere gigabytes.

Alle infrastructuur is gedefinieerd in Python CDK, gevalideerd en automatisch uitgerold via CI/CD-pipelines (self-hosted GitLab).


Diepgaand: hoe het werkt

1. Hive-stijl objectsleutelpartitionering

Elk bestand dat via de API wordt geüpload, wordt opgeslagen in S3 met een Apache Hive-stijl partitioneringsschema. Dit is niet alleen organisatorisch. Het maakt hoogperformante queries mogelijk en is direct gekoppeld aan het toegangscontrolemodel.

# construct_object_key.py — Deterministische, bevraagbare S3-sleutelgeneratie class Capability(str, Enum): EDAA = "edaa" CUSTOMER_FACING = "customer-facing" MANUFACTURING = "manufacturing" QUALITY = "quality" # ... def construct_object_key( capability: Capability | str | None = None, file_name: str = "", sub_capability: str | None = None, year: str | None = None, month: str | None = None, day: str | None = None, ) -> str: now = datetime.now() generated_hash = hashlib.md5( f"{capability}{sub_capability}{file_name}".encode() ).hexdigest()[:2] return ( f"capability={capability}/" f"sub-capability={sub_capability or 'unknown'}/" f"year={year or now.year}/" f"month={month or f'{now.month:02d}'}/" f"day={day or f'{now.day:02d}'}/" f"hash={generated_hash}/" f"{file_name}" )

Een bestand geüpload als report.pdf onder de manufacturing-capability op 23 december 2025 wordt:

capability=manufacturing/sub-capability=unknown/year=2025/month=12/day=23/hash=a3/report.pdf

Deze structuur stelt AWS Athena, AWS Glue of elke Hive-compatibele tool in staat om het data lake efficiënt te bevragen op capability, datumbereik of elke combinatie van partitiesleutels. De tweetekenige hash voorkomt hotspots bij intensieve schrijfbewerkingen in S3.

2. S3 Access Grants met Entra ID-federatie

In plaats van IAM-beleid per gebruiker te beheren, hebben we S3 Access Grants geïmplementeerd — een relatief nieuwe AWS-functie die claims van identiteitsproviders rechtstreeks koppelt aan S3-prefix-niveau-machtigingen.

Elke Entra ID-gebruiker of -groep wordt gekoppeld aan een specifieke S3-partitie. Wanneer een gebruiker zich authenticeert, wordt de oid-claim uit het JWT-token gebruikt om de bijbehorende IAM-rol op te zoeken, die vervolgens wordt aangenomen via sts:AssumeRoleWithWebIdentity. De rol is beperkt tot de partitie van de gebruiker via een Access Grant.

# access_grants.py — Per-gebruiker partitie-isolatie via S3 Access Grants CfnAccessGrant( self, f"AccessGrantUser{user_idx}", access_grants_location_id=user_location.ref, permission="READWRITE", grantee=CfnAccessGrant.GranteeProperty( grantee_identifier=user_role.role_arn, grantee_type="IAM", ), )

De Lambda-integratie wisselt vervolgens het Entra ID-token in voor beperkte AWS-referenties bij elk verzoek, met caching om overbodige STS-aanroepen te vermijden:

# get_cached_or_exchange_credentials.py — Tokenuitwisseling met caching def get_cached_or_exchange_credentials(id_token: str) -> CredentialsTypeDef: key = _cache_key_from_token(id_token) now = time.time() with _CACHE_LOCK: entry = _CACHE.get(key) if entry and entry["expires_at"] > now + _SKEW_SECONDS: return entry["credentials"] new_credentials = _exchange_and_assume_with_expiry(id_token) with _CACHE_LOCK: _CACHE[key] = { "credentials": new_credentials, "expires_at": new_credentials["Expiration"].timestamp(), } return new_credentials

Dit betekent dat Gebruiker A in de manufacturing-partitie geen bestanden kan lezen of schrijven in de quality-partitie van Gebruiker B. Dit wordt afgedwongen op S3-niveau, niet alleen op applicatieniveau.

3. Multimodale AI-metadataverrijking

Wanneer een bestand in S3 terechtkomt, start automatisch een event-driven pipeline. Het systeem detecteert het bestandstype, genereert een op maat gemaakte Bedrock Data Automation-blueprint, voert de extractietaak uit en publiceert de gestructureerde resultaten terug naar EventBridge.

De volledige workflow wordt georkestreerd door Step Functions met JSONata-expressies:

# workflow.py — Step Functions-orkestratie met Bedrock Data Automation definition_body = sfn.DefinitionBody.from_chainable( extract_event_data .next(generate_blueprint) # Dynamisch blueprint op basis van metadatavereisten .next(start_data_automation_job) # Bedrock Data Automation .next(wait_for_job_completion) # Asynchroon wachten met task token .next(normalize_event_data) # EventBridge Schema-gevalideerde output .next(publish_output_event) # EventBridge voltooiingsevent .next(sfn.Succeed(self, "FileMetadataEnrichmentCompletion")) )

De blueprintgenerator maakt dynamische extractieschema’s aan op basis van de bestandsmodaliteit en de door de gebruiker opgegeven metadatavereisten. Een afbeelding krijgt bounding box-detectie en categorisatie. Een document krijgt een samenvatting en extractie van kernpunten. Audio krijgt transcriptie en sprekeridentificatie:

# bedrock_data_automation_blueprint_generator.py def generate_blueprint_schema(enrichments, blueprint_type ): properties = {} # Basiseigenschappen — worden altijd geëxtraheerd properties["data_classification"] = { "type": "string", "instruction": "The data classification level (public, internal, confidential, restricted)", } properties["summary"] = { "type": "string", "instruction": "A brief summary of the file content", } properties["keywords"] = { "type": "array", "items": {"type": "string"}, "instruction": "Key terms and keywords extracted from the document", } # Modaliteitsspecifieke verrijkingen if enrichments.get("audio", {}).get("transcribe"): properties["transcript"] = { "type": "string", "instruction": "Full transcript of the audio content", } if enrichments.get("video", {}).get("scenes"): properties["scenes"] = { "type": "array", "items": { "type": "object", "properties": { "scene_number": {"type": "number"}, "start_time": {"type": "number"}, "end_time": {"type": "number"}, "description": {"type": "string"}, }, }, "instruction": "Scene changes detected in the video with timestamps", } return {"class": f"{blueprint_type.capitalize()}Metadata", "properties": properties}

De verrijkte metadata wordt naast het originele bestand opgeslagen als een .metadata.json-sidecarbestand op dezelfde Hive-gepartitioneerde locatie, waardoor het direct bevraagbaar is.

4. Ondersteuning voor multi-gigabyte uploads

API Gateway heeft een payloadlimiet van 10 MB. Productmeetbestanden kunnen gigabytes groot zijn. We hebben dit opgelost met een op vooraf ondertekende URL’s gebaseerde multipart-uploadstroom die API Gateway volledig omzeilt voor het zware werk.

De API berekent optimale chunkgroottes, initieert een multipart-upload en retourneert vooraf ondertekende URL’s voor elke chunk. De client uploadt rechtstreeks naar S3:

# multi_chunk.py — Orkestratie van multipart-upload met vooraf ondertekende URL's def initiate_multi_chunk_upload_presigned(body): chunk_size, num_chunks = _calculate_chunk_size(request.fileSize) response = s3.create_multipart_upload( Bucket=bucket, Key=key, ServerSideEncryption="aws:kms" ) presigned_urls = [] for chunk_number in range(1, num_chunks + 1): presigned_url = s3.generate_presigned_url( "upload_part", Params={ "Bucket": bucket, "Key": key, "UploadId": response["UploadId"], "PartNumber": chunk_number, }, ExpiresIn=expires_in, ) presigned_urls.append({ "chunkNumber": chunk_number, "url": presigned_url, "startByte": (chunk_number - 1) * chunk_size, "endByte": min(chunk_number * chunk_size - 1, request.fileSize - 1), }) return {"uploadId": response["UploadId"], "chunks": presigned_urls}

Aan de frontendzijde handelt de webapplicatie dit transparant af — kleine bestanden gaan via de API, grote bestanden schakelen automatisch over naar multipart:

// api.service.ts — Automatische selectie van uploadstrategie async upload(file: File, request: InitiateUploadRequest, onProgress?) { if (file.size > FILE_SIZE_THRESHOLD) { return this.multiChunkUploadService.uploadFile(file, request, { onProgress }); } const base64Content = await readFileAsBase64(file); return this.httpService.request('/files', 'POST', { body: { content: base64Content, fileName: request.fileName }, }); }

5. De webapplicatie

De frontend is een React-applicatie gebouwd op de Storage Browser-component van AWS Amplify, aangepast met acties die via onze API worden gerouteerd in plaats van rechtstreeks naar S3. Dit geeft ons volledige controle over toegangscontrole, metadata-operaties en uploadstrategieën, terwijl het een gepolijste, vertrouwde bestandsbeheerervaring biedt.

// storage-browser.provider.tsx — Aangepaste Storage Browser met API-gestuurde acties const { StorageBrowser } = createStorageBrowser({ config: { registerAuthListener: async (onAuthStateChange) => { const authService = getAuthService(); authService.registerAuthListener(onAuthStateChange); }, listLocations: async ({ options }) => { return await apiService.getLocations({ options: { pageSize: 30, nextToken: options?.nextToken }, }); }, }, actions: actionsBuilder.buildActions(), });

Gebruikers loggen in met hun Siemens Energy Entra ID-referenties en zien onmiddellijk alleen de partities waartoe ze toegang hebben. Ze kunnen bestanden van elke grootte uploaden, door de Hive-gepartitioneerde mappenstructuur bladeren, documenten en afbeeldingen inline bekijken, downloaden via vooraf ondertekende URL’s en metadata bewerken — allemaal zonder de browser te verlaten.

6. Cross-account event-driven architectuur

De File Manager en de microservice voor metadataverrijking draaien in afzonderlijke AWS-accounts. Wanneer een bestand wordt geüpload, triggert de S3-bucket van de File Manager een Lambda die een FileMetadataEnrichmentRequest-event publiceert naar een cross-account EventBridge-bus.

# file_metadata_enrichment_processor.py — Cross-account eventpublicatie def put_event(bucket, key, size=None, etag=None, enrichments=None): detail = create_event_detail(bucket, key, size, etag, enrichments) return events_client.put_events(Entries=[{ "Source": "com.siemens-energy.pmdp.file-metadata-enrichment", "DetailType": "FileMetadataEnrichmentRequest", "Detail": json.dumps(detail), "EventBusName": EVENT_BUS_ARN, # Cross-account ARN }])

Aan de verrijkingszijde routeren EventBridge-regels events via SQS (met DLQ voor veerkracht) naar een EventBridge Pipe die het event valideert tegen een schemaregister voordat de Step Functions-workflow wordt aangeroepen. Voltooiings- en uitzonderingsevents worden teruggestuurd naar het oorspronkelijke account.

Deze ontkoppelde architectuur betekent dat de verrijkingsmicroservice hergebruikt kan worden door elk team bij Siemens Energy. Ze hoeven alleen events naar de bus te publiceren.


Stresstesten: bewijs op schaal

Het systeem moest enorme hoeveelheden data aankunnen, zowel inkomend als uitgaand. Sommige van hun bestandsrepositories konden dagen in beslag nemen om simpelweg te worden verwijderd. We hebben een speciale stresstest-CLI gebouwd die bestanden van configureerbare groottes genereert (van 100 KB tot 5 GB+), ze gelijktijdig uploadt, downloadt via vooraf ondertekende URL’s en de integriteit verifieert met MD5-checksums.

Testruns valideerden:

  • Gelijktijdige uploads van 20+ bestanden tegelijk, inclusief multi-gigabyte payloads via multipart
  • 100% slagingspercentage over honderden bestanden in een enkele testrun
  • Downloadverificatie die byte-voor-byte integriteit bevestigt na een volledige rondgang door de hele pipeline
  • Automatische opschoning van testartefacten uit zowel S3 als lokale opslag

Infrastructure as Code: alles in CDK

Het volledige platform, beide AWS-accounts, alle services, alle IAM-rollen, alle eventroutering zijn gedefinieerd in Python CDK. Omgevingsspecifieke configuratie wordt beheerd via Hydra/OmegaConf, waardoor het triviaal is om een nieuwe omgeving op te zetten of een nieuw team aan te sluiten.

# config.py — Typeveilige, omgevingsspecifieke configuratie config_environment = make_config( env=zf(cdk.Environment), environment=zf(Literal["sandbox", "dev", "uat", "prd"]), s3explorer=zf(config_s3explorer), access_grants=zf(config_access_grants), project_name=zf(str, default="mfg-product-measurement-data-pipeline"), file_metadata_enrichment_event_bus_arn=zf(str), )

CI/CD voor de webapplicatie gebruikt GitLab OIDC-federatie. Geen langlevende referenties, geen secrets om te roteren. De CDK-stack provisioneert de OIDC-provider, de deploymentrol en de Amplify-bronbucket in één construct.


Resultaten

  • Bestandsgrootte-ondersteuning: onbeperkt (getest tot 5 GB+)
  • Upload-gelijktijdigheid: 20+ gelijktijdige uploads
  • Metadata-extractie: automatisch voor documenten, afbeeldingen, audio en video
  • Toegangscontrole: per-gebruiker en per-groep partitie-isolatie via S3 Access Grants
  • Omgevingen: 4 (sandbox, development, UAT, productie) vanuit één CDK-codebase
  • Identiteitsintegratie: Microsoft Entra ID SSO met OIDC-federatie
  • Infrastructuur: 100% Infrastructure as Code (Python CDK)

Technologiestack

  • Compute: AWS Lambda (Python 3.14, ARM64)
  • Orkestratie: AWS Step Functions (JSONata)
  • AI/ML: Amazon Bedrock Data Automation
  • Opslag: Amazon S3 (Intelligent-Tiering, KMS-encryptie, Transfer Acceleration)
  • API: Amazon API Gateway (REST) met Lambda Powertools + Swagger
  • Events: Amazon EventBridge, EventBridge Pipes, SQS
  • Identiteit: Microsoft Entra ID, OIDC-federatie, S3 Access Grants
  • Frontend: React, Vite, AWS Amplify, Amplify UI Storage Browser
  • IaC: AWS CDK (Python), Hydra/OmegaConf
  • CI/CD: GitLab CI

Conclusie

Dit project vereiste diepgaande expertise over de gehele AWS-stack, van IAM-beleidsontwerp op laag niveau en cross-account eventroutering tot geavanceerde Bedrock Data Automation-integratie en aanpassing van de Amplify Storage Browser.

We hebben ons best gedaan om verder te gaan dan de oorspronkelijke specificaties en de nieuwste en beste AWS-cloudinnovaties aan te bevelen. Het resultaat was een systeem waarbij het uploaden van een bestand een AI-pipeline triggert die het verrijkt met gestructureerde metadata, het opslaat in een bevraagbare partitie en het direct beschikbaar maakt via een webinterface. Allemaal zonder dat de gebruiker iets anders hoeft te doen dan slepen en neerzetten.

Veel organisaties moeten robuuste, moderne cloudapplicaties bouwen op AWS; systemen die echte schaal aankunnen, integreren met enterprise-identiteitsproviders en AI inzetten waar het ertoe doet. Bent u er een van? Laten we praten.


Meer artikelen

Nextnet: Wetenschappelijk AI met AWS CDK

Hoe AWS CDK en een weloverwogen keuze voor AWS-diensten snelle ontwikkeling, consistente implementaties en AI-integratie mogelijk maakten.

dinsdag 22 juli 2025

Contact

Hauptquartier
Tone Singleton SPR BV
Rue Henri Werriestraat, 6 (box 7)
1090 Brussels
Belgium
Tone Singleton Ptd Ltd
60 Paya Lebar Road #06-28
Paya Lebar Square
409051 Singapore


  • LinkedIn
Algemene VoorwaardenPrivacy Policy