Futurify Logo
Technology

Conquering Inventory Chaos: Building Modern Event-Driven Systems

Adri Shahri
#inventory#real-time#event-driven#cqrs#redpanda#postgres#debezium#kubernetes#golang#redis#kafka#microservices

In today’s fast-paced digital commerce landscape, traditional inventory management approaches are breaking under the strain of omnichannel retail, rapid fulfillment demands, and complex supply chains. This comprehensive guide explores building a modern, event-driven inventory management system (IMS) that can scale to meet these challenges while maintaining data consistency and real-time capabilities.

The Evolution of Inventory Management

    
timeline
  title Evolution of Inventory Systems
  1960(s) : Manual Systems : Paper Ledgers
          : Physical counting
          : Basic record keeping
          : Weekly reconciliation
  1980(s) : Early Digital : Mainframe Systems  
          : Basic databases
          : Batch processing
          : Monthly reporting
  1990(s) : Client-Server : Distributed Systems
          : Local databases
          : Daily updates
          : Barcode scanning
  2000(s) : Enterprise Systems : ERP Integration
          : Centralized databases
          : Real-time updates
          : RFID tracking
  2010(s) : Cloud Solutions : SaaS & PaaS
          : Mobile integration
          : API-first approach
          : Multi-channel
          : ML predictions
  2020(s) : Event-Driven : Microservices
          : Real-time analytics
          : Omnichannel
          : CQRS patterns
          : Edge computing
          : AI-driven

  

1. Manual Era (1960s)

2. Digital Dawn (1980s)

3. Client-Server Revolution (1990s)

4. Enterprise Integration (2000s)

5. Cloud Transformation (2010s)

6. Modern Architecture (2020s)

Key Challenges in Modern Inventory Management

Modern inventory management faces several critical challenges that require sophisticated solutions. These challenges have evolved with the increasing complexity of global supply chains, omnichannel retail, and customer expectations for real-time accuracy.

1. Data Accuracy and Real-time Synchronization

In today’s fast-paced retail environment, maintaining accurate inventory data is more challenging and critical than ever. Businesses must track inventory across multiple channels, locations, and systems while ensuring data consistency and real-time updates.

    
mindmap
  root((Data
  Accuracy))
      Real-time Updates
          Multi-channel Sales
          Returns Processing
          Stock Adjustments
          Warehouse Movements
      Multi-source Sync
          Store Systems
          E-commerce Platform
          Warehouse Management
          Supplier Integration
      Error Prevention
          Validation Rules
          Automated Checks
          Reconciliation
          Audit Trails

  

Key Implications:

Solution Requirements:

2. Scale and Performance

As businesses grow, their inventory management systems must handle increasing volumes of data and transactions while maintaining performance and reliability.

    
mindmap
  root((Scale
  Challenges))
      High Volume
          Millions of SKUs
          Multiple Locations
          Concurrent Users
          Peak Processing
      Geographic Distribution
          Multiple Regions
          Data Replication
          Latency Management
          Legal Compliance
      System Performance
          Response Time
          Processing Speed
          Resource Usage
          Cost Efficiency

  

Key Implications:

Solution Requirements:

3. Integration Complexity

Modern inventory systems must integrate with a diverse ecosystem of applications, partners, and services, each with its own protocols, data formats, and requirements.

    
mindmap
  root((Integration
  Complexity))
      Legacy Systems
          ERP Integration
          Custom Protocols
          Data Migration
          System Constraints
      Partner Networks
          Supplier Systems
          Logistics Partners
          Payment Providers
          Analytics Tools
      New Channels
          Mobile Apps
          Social Commerce
          Marketplaces
          Voice Commerce

  

Key Implications:

Solution Requirements:

4. Business Complexity

Modern inventory management must accommodate complex business rules, workflows, and requirements while maintaining flexibility and adaptability.

    
mindmap
  root((Business
  Complexity))
      Product Management
          Multiple SKUs
          Variants Handling
          Bundle Products
          Digital Products
      Location Management
          Warehouses
          Retail Stores
          Dark Stores
          Partner Locations
      Inventory Rules
          Safety Stock
          Reorder Points
          Allocation Rules
          Reservation Logic

  

Key Implications:

Solution Requirements:

5. Compliance and Security

Inventory management systems must meet stringent security requirements and comply with various regulations while protecting sensitive business data.

    
mindmap
  root((Compliance &
  Security))
      Regulations
          Data Protection
          Financial Rules
          Industry Standards
          Local Laws
      Auditing
          Transaction Logs
          Change History
          Access Records
          System Events
      Security
          Access Control
          Data Encryption
          Threat Detection
          Incident Response

  

Key Implications:

Solution Requirements:

Impact of These Challenges

These challenges are interconnected and their impact extends across the entire organization:

  1. Operational Impact

    • Increased operational complexity
    • Higher resource requirements
    • Need for specialized skills
    • Process inefficiencies
    • Risk of errors and issues
  2. Business Impact

    • Customer satisfaction affected by data accuracy
    • Revenue loss from stockouts or overselling
    • Higher operational costs
    • Reduced agility and competitiveness
    • Compliance risks and penalties
  3. Technical Impact

    • System performance challenges
    • Integration difficulties
    • Maintenance complexity
    • Security vulnerabilities
    • Scalability limitations

The solution to these challenges requires a modern, event-driven architecture that can:

System Requirements

1. Functional Requirements

    
classDiagram
  class InventorySystem {
      +addStock(item: Item, quantity: number)
      +removeStock(item: Item, quantity: number)
      +adjustStock(item: Item, adjustment: number)
      +moveStock(item: Item, from: Location, to: Location)
      +reserveStock(item: Item, quantity: number)
      +confirmReservation(reservation: Reservation)
      +cancelReservation(reservation: Reservation)
      +getStockLevel(item: Item)
      +getStockHistory(item: Item)
      +getForecast(item: Item)
  }

  class StockLevel {
      +number available
      +number reserved
      +number inTransit
      +number backorder
      +number reorderPoint
      +number safetyStock
  }

  class Item {
      +string id
      +string sku
      +string name
      +string description
      +Category category
  }

  class Location {
      +string id
      +string name
      +string type
      +Address address
  }

  class Reservation {
      +string id
      +Item item
      +number quantity
      +string status
      +Date expiresAt
  }

  InventorySystem ..> StockLevel : returns
  InventorySystem ..> Reservation : manages
  InventorySystem --> Item : manages
  InventorySystem --> Location : manages


  

2. Non-Functional Requirements

    
graph TB
  subgraph "Performance Requirements"
      P1[Response Time] --> P2[< 100ms for 99th percentile]
      P1 --> P3[< 1s for batch operations]
      P4[Throughput] --> P5[10K transactions/second]
      P4 --> P6[1M events/day]
  end

  subgraph "Scalability Requirements"
      S1[Horizontal Scaling] --> S2[Auto-scaling support]
      S1 --> S3[Multi-region deployment]
      S4[Data Growth] --> S5[1TB/month]
      S4 --> S6[3 years retention]
  end

  subgraph "Reliability Requirements"
      R1[Availability] --> R2[99.99% uptime]
      R1 --> R3[24/7 operation]
      R4[Fault Tolerance] --> R5[No single point of failure]
      R4 --> R6[Automatic failover]
  end

  subgraph "Security Requirements"
      SE1[Authentication] --> SE2[OAuth 2.0/OIDC]
      SE1 --> SE3[Role-based access]
      SE4[Data Protection] --> SE5[Encryption at rest]
      SE4 --> SE6[TLS in transit]
  end


  

1. Performance Requirements

  1. Response Time

    • API endpoints must respond within 100ms for the 99th percentile
    • Batch operations must complete within 1 second
    • Real-time event processing latency under 50ms
  2. Throughput

    • Support 10,000 transactions per second
    • Handle 1 million events per day
    • Process 100 concurrent requests
  3. Resource Utilization

    • CPU usage below 70% under normal load
    • Memory usage below 80% of allocated resources
    • Network bandwidth optimization with compression

2. Scalability Requirements

  1. Horizontal Scaling

    • Auto-scaling based on load metrics
    • Support for multi-region deployment
    • Zero-downtime scaling operations
  2. Data Growth

    • Handle 1TB of new data per month
    • Support 3 years of historical data
    • Efficient data archival process
  3. Load Handling

    • Scale to 1000+ concurrent users
    • Handle 10x traffic spikes
    • Support global distribution

3. Reliability Requirements

  1. Availability

    • 99.99% uptime guarantee
    • 24/7 operation support
    • Planned maintenance windows
  2. Fault Tolerance

    • No single point of failure
    • Automatic failover mechanisms
    • Data replication across zones
  3. Data Consistency

    • Eventually consistent read operations
    • Strongly consistent write operations
    • Versioning for conflict resolution

4. Security Requirements

  1. Authentication & Authorization

    • OAuth 2.0/OIDC implementation
    • Role-based access control (RBAC)
    • Multi-factor authentication support
  2. Data Protection

    • Encryption at rest (AES-256)
    • TLS 1.3 for data in transit
    • Regular security audits
  3. Compliance

    • GDPR compliance
    • SOC 2 certification
    • Audit logging

5. Maintainability Requirements

  1. Monitoring

    • Real-time system metrics
    • Automated alerting
    • Performance analytics
  2. Deployment

    • Automated CI/CD pipeline
    • Blue-green deployment support
    • Rollback capabilities
  3. Documentation

    • API documentation
    • System architecture docs
    • Operational procedures

System Architecture

1. Component Architecture

The component architecture integrates several key aspects including service mesh, security, and observability:

  1. Service Mesh Integration

    • Istio control plane for service management
    • Envoy proxies for service-to-service communication
    • Traffic management with dynamic routing and load balancing
    • Service-level security with mTLS
    • Distributed tracing and metrics collection
  2. Security Controls

    • OAuth 2.0/OIDC for authentication
    • Role-based access control (RBAC)
    • Network policies and segmentation
    • Data encryption at rest and in transit
    • Audit logging and compliance monitoring
  3. Observability Stack

    • Prometheus for metrics collection
    • Loki for log aggregation
    • Tempo for distributed tracing
    • Grafana for visualization
    • AlertManager for incident management
    
graph TB
  subgraph "Frontend Layer"
      UI[Web UI]
      Mobile[Mobile Apps]
      B2B[B2B Portal]
  end

  subgraph "API Gateway Layer"
      Gateway[API Gateway]
      Auth[Authentication]
      RateLimit[Rate Limiter]
  end

  subgraph "Service Mesh Layer"
      IC[Istio Control]
      Proxy1[Envoy Proxy 1]
      Proxy2[Envoy Proxy 2]
      Proxy3[Envoy Proxy 3]
  end

  subgraph "Command Services"
      Stock[Stock Service]
      Location[Location Service]
      Reserve[Reservation Service]
      Order[Order Service]
  end

  subgraph "Event Processing"
      RP[Redpanda Cluster]
      SR[Schema Registry]
      DLQ[Dead Letter Queue]
      CDC[Change Data Capture]
  end

  subgraph "Query Services"
      Analytics[Analytics Service]
      Report[Reporting Service]
      Search[Search Service]
      Cache[Redis Cache]
  end

  subgraph "Storage Layer"
      PG[(PostgreSQL)]
      ES[(Elasticsearch)]
      Metrics[(Prometheus)]
  end

  UI --> Gateway
  Mobile --> Gateway
  B2B --> Gateway
  Gateway --> Auth
  Gateway --> RateLimit

  Auth --> IC
  IC --> Proxy1
  IC --> Proxy2
  IC --> Proxy3

  Proxy1 --> Stock
  Proxy2 --> Location
  Proxy3 --> Reserve

  Stock --> RP
  Location --> RP
  Reserve --> RP
  Order --> RP

  RP --> Analytics
  RP --> Report
  RP --> Search
  RP --> DLQ

  Stock --> PG
  Location --> PG
  Reserve --> PG
  Order --> PG

  PG --> CDC
  CDC --> RP

  Analytics --> Cache
  Report --> ES
  Search --> ES

  Stock --> Metrics
  Location --> Metrics
  Reserve --> Metrics
  Order --> Metrics


  

2. Data Flow Architecture

    
sequenceDiagram
  participant C as Client
  participant G as API Gateway
  participant M as Service Mesh
  participant S as Stock Service
  participant R as Redpanda
  participant P as PostgreSQL
  participant Q as Query Service
  participant Ca as Cache
  participant O as Observability

  C->>G: Request stock update
  G->>M: Route request
  M->>S: Forward with mTLS

  rect rgb(240, 240, 240)
      Note over S,P: Transaction Boundary
      S->>P: Begin transaction
      S->>P: Update stock
      S->>R: Publish event
      S->>P: Commit transaction
  end

  R-->>Q: Consume event
  Q->>Ca: Update cache
  Q-->>C: Send update notification

  M->>O: Send telemetry
  S->>O: Send metrics
  P->>O: Send database metrics


  

3. Deployment Architecture

    
graph TB
  subgraph "Kubernetes Cluster"
      subgraph "Ingress Layer"
          Ingress[Ingress Controller]
          Cert[Cert Manager]
      end

      subgraph "Application Layer"
          API[API Gateway]
          Stock[Stock Service]
          Query[Query Service]
      end

      subgraph "Data Layer"
          RP[Redpanda StatefulSet]
          PG[PostgreSQL StatefulSet]
          Redis[Redis StatefulSet]
      end

      subgraph "Monitoring"
          Prom[Prometheus]
          Graf[Grafana]
          Alert[AlertManager]
      end
  end

  subgraph "External Services"
      DNS[DNS]
      CDN[CDN]
      Backup[Backup Storage]
  end

  DNS --> Ingress
  CDN --> Ingress
  Ingress --> API
  API --> Stock
  API --> Query
  Stock --> RP
  Stock --> PG
  Query --> Redis
  PG --> Backup
  RP --> Backup


  

4. Infrastructure Components

4.1 Local Kubernetes Setup with Minikube

For this implementation, we’ll use Minikube as our local Kubernetes cluster. Minikube provides a lightweight way to run Kubernetes locally for development and testing.

# Start Minikube with sufficient resources for our stack
minikube start --cpus 4 --memory 8192 --disk-size 20g

# Enable necessary addons
minikube addons enable metrics-server
minikube addons enable ingress

# Verify the cluster is running
kubectl cluster-info

Expected output:

Kubernetes control plane is running at https://192.168.49.2:8443
CoreDNS is running at https://192.168.49.2:8443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy

For development purposes, we’ll configure Minikube to use the local Docker daemon:

# Configure shell to use Minikube's Docker daemon
eval $(minikube docker-env)

4.2 Redpanda Configuration

First, let’s create a dedicated namespace for Redpanda:

kubectl create namespace redpanda

Add the Redpanda Helm repository and update it:

helm repo add redpanda https://charts.redpanda.com
helm repo update

Create a file named redpanda-values.yaml with a minimal configuration for local development:

# redpanda-values.yaml
statefulset:
  initContainers:
    setDataDirOwnership:
      enabled: true
  replicas: 1

resources:
  cpu:
    cores: 1
  memory:
    container:
      max: 1.5Gi
    redpanda:
      reserveMemory: 1Gi

storage:
  persistentVolume:
    enabled: true
    size: 4Gi
    storageClass: "standard" # Use Minikube's default storage class

auth:
  sasl:
    enabled: false
  tls:
    enabled: false

console:
  enabled: true
  auth:
    enabled: false

Install Redpanda using Helm:

helm install redpanda redpanda/redpanda \
  --namespace redpanda \
  --values redpanda-values.yaml

Verify the installation:

# Check if pods are running
kubectl get pods -n redpanda

# Expected output:
NAME                                READY   STATUS    RESTARTS   AGE
redpanda-0                         1/1     Running   0          2m
redpanda-console-7b9c8b6f7-x9j2v   1/1     Running   0          2m

Set up port forwarding for local access:

# For Kafka API
kubectl port-forward -n redpanda redpanda-0 9092:9092 &

# For Schema Registry
kubectl port-forward -n redpanda redpanda-0 8081:8081 &

# For Redpanda Console
kubectl port-forward -n redpanda svc/redpanda-console 8080:8080 &

Test connectivity using rpk:

# Create a test topic
rpk topic create test-topic \
  --brokers localhost:9092

# List topics
rpk topic list \
  --brokers localhost:9092

# Expected output:
TOPIC       PARTITIONS  REPLICAS
test-topic  1          1

For production deployments, you should use a more comprehensive configuration:

# redpanda-prod-values.yaml
statefulset:
  initContainers:
    setDataDirOwnership:
      enabled: true
  replicas: 3

resources:
  cpu:
    cores: 2
  memory:
    container:
      max: 8Gi
    redpanda:
      reserveMemory: 6Gi

storage:
  persistentVolume:
    enabled: true
    size: 100Gi
    storageClass: "ceph-filesystem" # Use appropriate storage class for your cloud provider

auth:
  sasl:
    enabled: true
    users:
      - name: admin
        password: ${REDPANDA_ADMIN_PASSWORD}
  tls:
    enabled: true

monitoring:
  enabled: true
  serviceMonitor:
    enabled: true

console:
  enabled: true
  auth:
    enabled: true

4.2 PostgreSQL Configuration

# postgres-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-config
  namespace: database
data:
  postgresql.conf: |
    listen_addresses = '*'
    max_connections = 100
    shared_buffers = 256MB
    wal_level = logical
    max_wal_senders = 10
    max_replication_slots = 10
    wal_sender_timeout = 60s
    max_worker_processes = 10
    track_commit_timestamp = on
  pg_hba.conf: |
    local   all             all                                     trust
    host    all             all             127.0.0.1/32            trust
    host    all             all             ::1/128                 trust
    host    all             all             0.0.0.0/0               md5
    host    replication     all             0.0.0.0/0               md5

---
# postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: postgres
  namespace: database
spec:
  serviceName: postgres
  replicas: 1
  selector:
    matchLabels:
      app: postgres
  template:
    metadata:
      labels:
        app: postgres
    spec:
      containers:
        - name: postgres
          image: postgres:15
          ports:
            - containerPort: 5432
              name: postgres
          env:
            - name: POSTGRES_USER
              valueFrom:
                secretKeyRef:
                  name: postgres-secrets
                  key: username
            - name: POSTGRES_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: postgres-secrets
                  key: password
            - name: POSTGRES_DB
              value: inventory
            - name: PGDATA
              value: /var/lib/postgresql/data/pgdata
          volumeMounts:
            - name: postgres-data
              mountPath: /var/lib/postgresql/data
            - name: postgres-config
              mountPath: /etc/postgresql/postgresql.conf
              subPath: postgresql.conf
            - name: postgres-config
              mountPath: /etc/postgresql/pg_hba.conf
              subPath: pg_hba.conf
          resources:
            requests:
              memory: 2Gi
              cpu: 1000m
            limits:
              memory: 4Gi
              cpu: 2000m
  volumeClaimTemplates:
    - metadata:
        name: postgres-data
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 100Gi

4.3 Debezium Configuration

# debezium-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: debezium-config
  namespace: debezium
data:
  application.properties: |
    bootstrap.servers=redpanda.redpanda.svc.cluster.local:9092
    group.id=inventory-connect-cluster
    offset.storage.topic=connect-offsets
    config.storage.topic=connect-configs
    status.storage.topic=connect-status

    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://redpanda.redpanda.svc.cluster.local:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://redpanda.redpanda.svc.cluster.local:8081

    rest.port=8083
    rest.advertised.host.name=debezium
    plugin.path=/kafka/connect

---
# debezium-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: debezium
  namespace: debezium
spec:
  replicas: 2
  selector:
    matchLabels:
      app: debezium
  template:
    metadata:
      labels:
        app: debezium
    spec:
      containers:
        - name: debezium
          image: debezium/connect:2.3
          ports:
            - containerPort: 8083
          env:
            - name: BOOTSTRAP_SERVERS
              value: redpanda.redpanda.svc.cluster.local:9092
            - name: GROUP_ID
              value: inventory-connect-cluster
            - name: CONFIG_STORAGE_TOPIC
              value: connect-configs
            - name: OFFSET_STORAGE_TOPIC
              value: connect-offsets
            - name: STATUS_STORAGE_TOPIC
              value: connect-status
          volumeMounts:
            - name: debezium-config
              mountPath: /kafka/config/connect-distributed.properties
              subPath: application.properties
          resources:
            requests:
              memory: 1Gi
              cpu: 500m
            limits:
              memory: 2Gi
              cpu: 1000m
      volumes:
        - name: debezium-config
          configMap:
            name: debezium-config

5. Service Mesh Architecture

The service mesh provides a dedicated infrastructure layer for handling service-to-service communication in our distributed inventory system. We use Istio as our service mesh implementation to manage, secure, and observe microservices interactions.

5.1 Core Components

    
graph TB
  subgraph "Control Plane"
      IC[Istio Control]
      style IC fill:#f9f,stroke:#333
      subgraph "Management"
          Config[Configuration API]
          Discovery[Service Discovery]
          Security[Security Management]
      end
  end

  subgraph "Data Plane"
      subgraph "Service Pod A"
          App1[Application]
          Proxy1[Envoy Proxy]
      end

      subgraph "Service Pod B"
          App2[Application]
          Proxy2[Envoy Proxy]
      end
  end

  IC --> Proxy1
  IC --> Proxy2
  Proxy1 <--> Proxy2
  App1 --> Proxy1
  App2 --> Proxy2


  

5.2 Key Features

6. Security Architecture

Our security architecture implements defense in depth, ensuring protection at every layer of the system:

6.1 Security Layers

    
graph TB
  subgraph "External Security"
      WAF[Web Application Firewall]
      DDOS[DDoS Protection]
      API[API Gateway]
  end

  subgraph "Identity & Access"
      Auth[Authentication]
      RBAC[Role-Based Access]
      OAuth[OAuth2/OIDC]
  end

  subgraph "Data Security"
      Encrypt[Encryption]
      Mask[Data Masking]
      Keys[Key Management]
  end

  subgraph "Network Security"
      MTLS[Mutual TLS]
      SEG[Network Segmentation]
      FW[Network Policies]
  end

  WAF --> API
  API --> Auth
  Auth --> OAuth
  OAuth --> RBAC
  RBAC --> Encrypt
  Encrypt --> MTLS


  

6.2 Access Control

7. Monitoring and Observability

Our observability implementation provides comprehensive visibility across all system components:

    
graph TB
  subgraph "Data Collection"
      Metrics[Prometheus Metrics]
      Logs[Loki Logs]
      Traces[Tempo Traces]
  end

  subgraph "Processing"
      Rules[Alert Rules]
      Aggregation[Log Aggregation]
      Analysis[Trace Analysis]
  end

  subgraph "Visualization"
      Grafana[Grafana Dashboards]
      Alerts[Alert Manager]
      Reports[Custom Reports]
  end

  Metrics --> Rules
  Logs --> Aggregation
  Traces --> Analysis

  Rules --> Grafana
  Aggregation --> Grafana
  Analysis --> Grafana

  Rules --> Alerts


  

Service Implementation

1. Service Architecture

    
graph TB
  subgraph "Stock Service"
      API[HTTP API Layer]
      Logic[Business Logic]
      Events[Event Handler]
      Cache[Cache Layer]
      DB[Database Layer]
  end

  subgraph "External Systems"
      RP[Redpanda]
      PG[PostgreSQL]
      Redis[Redis Cache]
      Prom[Prometheus]
  end

  API --> Logic
  Logic --> Events
  Logic --> Cache
  Logic --> DB

  Events --> RP
  DB --> PG
  Cache --> Redis
  Logic --> Prom


  

2. Service Components

2.1 Configuration

// config.go
package main

type Config struct {
    DBConfig         PostgresConfig
    RedpandaConfig   RedpandaConfig
    CacheConfig      CacheConfig
    MetricsConfig    MetricsConfig
    ServerConfig     ServerConfig
}

type PostgresConfig struct {
    Host         string
    Port         int
    Database     string
    User         string
    Password     string
    MaxConns     int
    MinConns     int
    MaxIdleTime  string
}

type RedpandaConfig struct {
    Brokers         []string
    SchemaRegistry  string
    ConsumerGroup   string
    CommandTopic    string
    EventTopic      string
    FailureTopic    string
}

type CacheConfig struct {
    Address     string
    Password    string
    DB          int
    TTL         string
}

type MetricsConfig struct {
    Enabled     bool
    Port        int
    Path        string
}

type ServerConfig struct {
    Port            int
    ReadTimeout     string
    WriteTimeout    string
    ShutdownTimeout string
}

2.2 Domain Models

// models.go
package main

import (
    "time"
    "github.com/google/uuid"
)

type StockItem struct {
    ID              uuid.UUID   `json:"id"`
    SKU             string      `json:"sku"`
    Name            string      `json:"name"`
    Description     string      `json:"description"`
    CategoryID      uuid.UUID   `json:"categoryId"`
    CreatedAt       time.Time   `json:"createdAt"`
    UpdatedAt       time.Time   `json:"updatedAt"`
}

type StockLevel struct {
    ID              uuid.UUID   `json:"id"`
    ItemID          uuid.UUID   `json:"itemId"`
    LocationID      uuid.UUID   `json:"locationId"`
    Quantity        int         `json:"quantity"`
    Reserved        int         `json:"reserved"`
    ReorderPoint    int         `json:"reorderPoint"`
    SafetyStock     int         `json:"safetyStock"`
    Version         int         `json:"version"`
    CreatedAt       time.Time   `json:"createdAt"`
    UpdatedAt       time.Time   `json:"updatedAt"`
}

type StockMovement struct {
    ID              uuid.UUID   `json:"id"`
    ItemID          uuid.UUID   `json:"itemId"`
    FromLocationID  *uuid.UUID  `json:"fromLocationId"`
    ToLocationID    *uuid.UUID  `json:"toLocationId"`
    Quantity        int         `json:"quantity"`
    Type            string      `json:"type"`
    Status          string      `json:"status"`
    ReferenceID     *uuid.UUID  `json:"referenceId"`
    CreatedAt       time.Time   `json:"createdAt"`
    CompletedAt     *time.Time  `json:"completedAt"`
}

2.3 Event Handling

    
sequenceDiagram
  participant C as Client
  participant H as HTTP Handler
  participant S as Service
  participant E as Event Handler
  participant R as Redpanda
  participant D as Database

  C->>H: POST /api/stock
  H->>S: Process Command

  S->>D: Begin Transaction
  S->>D: Update Stock
  S->>E: Create Event
  E->>R: Publish Event
  S->>D: Commit Transaction

  H-->>C: Response

  R-->>E: Consume Event
  E->>S: Process Event
  S->>D: Update State


  

2.3.1 Redpanda Connection Configuration

// redpanda.go
package main

type RedpandaConfig struct {
    Brokers        []string
    SchemaRegistry string
    ClientID       string
    GroupID        string
    Topics         Topics
}

type Topics struct {
    StockCommands string
    StockEvents   string
    StockFailures string
}

func NewRedpandaClient(cfg RedpandaConfig) (*kgo.Client, error) {
    client, err := kgo.NewClient(
        kgo.SeedBrokers(cfg.Brokers...),
        kgo.ClientID(cfg.ClientID),
        kgo.ConsumerGroup(cfg.GroupID),
        kgo.ConsumeTopics(cfg.Topics.StockCommands),
        kgo.DisableAutoCommit(),
        kgo.RequiredAcks(kgo.AllISRAcks()),
        kgo.RetryBackoffFn(func(attempt int) time.Duration {
            return time.Millisecond * time.Duration(math.Min(float64(attempt*200), 1000))
        }),
    )
    if err != nil {
        return nil, fmt.Errorf("create redpanda client: %w", err)
    }

    // Test connection
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := client.Ping(ctx); err != nil {
        return nil, fmt.Errorf("ping redpanda: %w", err)
    }

    return client, nil
}

// Example configuration
var config = RedpandaConfig{
    Brokers:        []string{"localhost:9092"},
    SchemaRegistry: "http://localhost:8081",
    ClientID:       "stock-service",
    GroupID:        "stock-service-group",
    Topics: Topics{
        StockCommands: "inventory.stock.commands",
        StockEvents:   "inventory.stock.events",
        StockFailures: "inventory.stock.failures",
    },
}

2.3.2 Publisher Implementation

// publisher.go
package main

type StockEventPublisher struct {
    client  *kgo.Client
    schema  *sr.Client
    logger  *zap.Logger
    metrics *Metrics
}

type StockEvent struct {
    ID        string    `json:"id"`
    Type      string    `json:"type"`
    ItemID    string    `json:"itemId"`
    Quantity  int       `json:"quantity"`
    Location  string    `json:"location"`
    Timestamp time.Time `json:"timestamp"`
    Metadata  map[string]string `json:"metadata,omitempty"`
}

func (p *StockEventPublisher) PublishStockEvent(ctx context.Context, event StockEvent) error {
    start := time.Now()
    defer func() {
        p.metrics.eventLatency.WithLabelValues("publish").Observe(
            time.Since(start).Seconds(),
        )
    }()

    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }

    record := &kgo.Record{
        Topic: "inventory.stock.events",
        Key:   []byte(event.ItemID),
        Value: data,
        Headers: []kgo.RecordHeader{
            {Key: "event_type", Value: []byte(event.Type)},
            {Key: "timestamp", Value: []byte(event.Timestamp.Format(time.RFC3339))},
        },
    }

    results := p.client.ProduceSync(ctx, record)
    if err := results.FirstErr(); err != nil {
        p.logger.Error("failed to publish event",
            zap.Error(err),
            zap.String("event_id", event.ID),
            zap.String("event_type", event.Type))
        return fmt.Errorf("produce event: %w", err)
    }

    p.logger.Info("event published successfully",
        zap.String("event_id", event.ID),
        zap.String("event_type", event.Type),
        zap.String("item_id", event.ItemID),
        zap.Int("quantity", event.Quantity))

    return nil
}

// Example usage:
func ExamplePublishEvent() {
    event := StockEvent{
        ID:        uuid.New().String(),
        Type:      "STOCK_UPDATED",
        ItemID:    "item-123",
        Quantity:  100,
        Location:  "warehouse-1",
        Timestamp: time.Now(),
        Metadata: map[string]string{
            "user":   "system",
            "reason": "restock",
        },
    }

    if err := publisher.PublishStockEvent(context.Background(), event); err != nil {
        log.Printf("Failed to publish event: %v", err)
        return
    }

    // Expected log output:
    // {"level":"info","ts":"2025-03-17T11:23:45.678Z","caller":"publisher/publisher.go:89",
    // "msg":"event published successfully","event_id":"550e8400-e29b-41d4-a716-446655440000",
    // "event_type":"STOCK_UPDATED","item_id":"item-123","quantity":100}
}

2.3.3 Subscriber Implementation

// subscriber.go
package main

type StockEventSubscriber struct {
    client     *kgo.Client
    handler    StockEventHandler
    logger     *zap.Logger
    metrics    *Metrics
    shutdownCh chan struct{}
}

type StockEventHandler interface {
    HandleStockEvent(context.Context, StockEvent) error
}

func (s *StockEventSubscriber) Start(ctx context.Context) error {
    s.logger.Info("starting stock event subscriber",
        zap.Strings("topics", []string{"inventory.stock.events"}))

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-s.shutdownCh:
            return nil
        default:
            fetches := s.client.PollFetches(ctx)
            if errs := fetches.Errors(); len(errs) > 0 {
                s.handleErrors(errs)
                continue
            }

            s.processFetches(ctx, fetches)
        }
    }
}

func (s *StockEventSubscriber) processFetches(ctx context.Context, fetches kgo.Fetches) {
    fetches.EachPartition(func(p kgo.FetchTopicPartition) {
        p.EachRecord(func(record *kgo.Record) {
            start := time.Now()
            s.metrics.eventLatency.WithLabelValues("consume").Observe(
                time.Since(start).Seconds(),
            )

            var event StockEvent
            if err := json.Unmarshal(record.Value, &event); err != nil {
                s.logger.Error("failed to unmarshal event",
                    zap.Error(err),
                    zap.String("topic", record.Topic),
                    zap.Int32("partition", record.Partition),
                    zap.Int64("offset", record.Offset))
                return
            }

            if err := s.handler.HandleStockEvent(ctx, event); err != nil {
                s.logger.Error("failed to handle event",
                    zap.Error(err),
                    zap.String("event_id", event.ID),
                    zap.String("event_type", event.Type))
                return
            }

            s.logger.Info("event processed successfully",
                zap.String("event_id", event.ID),
                zap.String("event_type", event.Type),
                zap.String("item_id", event.ItemID),
                zap.Duration("processing_time", time.Since(start)))
        })
    })

    if err := s.client.CommitOffsets(ctx); err != nil {
        s.logger.Error("failed to commit offsets", zap.Error(err))
    }
}

// Example handler implementation:
type StockUpdateHandler struct {
    repository *StockRepository
    logger     *zap.Logger
}

func (h *StockUpdateHandler) HandleStockEvent(ctx context.Context, event StockEvent) error {
    h.logger.Info("handling stock event",
        zap.String("event_id", event.ID),
        zap.String("event_type", event.Type))

    switch event.Type {
    case "STOCK_UPDATED":
        return h.handleStockUpdate(ctx, event)
    case "STOCK_RESERVED":
        return h.handleStockReservation(ctx, event)
    default:
        return fmt.Errorf("unknown event type: %s", event.Type)
    }
}

// Example usage and logs:
func ExampleSubscriber() {
    handler := &StockUpdateHandler{
        repository: repo,
        logger:     logger,
    }

    subscriber := &StockEventSubscriber{
        client:  redpandaClient,
        handler: handler,
        logger:  logger,
    }

    if err := subscriber.Start(context.Background()); err != nil {
        log.Fatalf("Failed to start subscriber: %v", err)
    }

    // Expected log output:
    // {"level":"info","ts":"2025-03-17T11:23:45.678Z","caller":"subscriber/subscriber.go:112",
    // "msg":"starting stock event subscriber","topics":["inventory.stock.events"]}
    //
    // {"level":"info","ts":"2025-03-17T11:23:46.789Z","caller":"subscriber/subscriber.go:156",
    // "msg":"event processed successfully","event_id":"550e8400-e29b-41d4-a716-446655440000",
    // "event_type":"STOCK_UPDATED","item_id":"item-123","processing_time":"0.123s"}
}

2.3.4 Example Logs and Metrics

Here’s what you might see in your logs and metrics when the system is running:

# Connection Logs
[2025-03-17T11:20:00Z] INFO  Connecting to Redpanda brokers: [localhost:9092]
[2025-03-17T11:20:00Z] INFO  Connected to Redpanda cluster successfully
[2025-03-17T11:20:00Z] INFO  Schema Registry connected at http://localhost:8081

# Publisher Logs
[2025-03-17T11:23:45Z] INFO  Publishing stock update event
[2025-03-17T11:23:45Z] DEBUG Event payload: {"id":"550e8400-e29b-41d4-a716-446655440000","type":"STOCK_UPDATED",...}
[2025-03-17T11:23:45Z] INFO  Event published successfully (topic=inventory.stock.events, partition=0, offset=1234)

# Subscriber Logs
[2025-03-17T11:23:45Z] INFO  Starting stock event subscriber
[2025-03-17T11:23:45Z] INFO  Subscribed to topics: [inventory.stock.events]
[2025-03-17T11:23:46Z] INFO  Received event: STOCK_UPDATED (id=550e8400-e29b-41d4-a716-446655440000)
[2025-03-17T11:23:46Z] DEBUG Processing event for item-123: quantity=100, location=warehouse-1
[2025-03-17T11:23:46Z] INFO  Event processed successfully (processing_time=123ms)

# Error Logs
[2025-03-17T11:25:00Z] ERROR Failed to connect to broker: connection refused
[2025-03-17T11:25:01Z] INFO  Retrying connection (attempt 1/3)
[2025-03-17T11:25:10Z] ERROR Failed to process event: validation error (event_id=550e8400-e29b-41d4-a716-446655440000)

# Metrics (Prometheus format)
# HELP redpanda_events_total Total number of events processed
# TYPE redpanda_events_total counter
redpanda_events_total{type="published"} 1234
redpanda_events_total{type="consumed"} 1200

# HELP redpanda_event_processing_duration_seconds Event processing duration
# TYPE redpanda_event_processing_duration_seconds histogram
redpanda_event_processing_duration_seconds_bucket{le="0.1"} 980
redpanda_event_processing_duration_seconds_bucket{le="0.5"} 1150
redpanda_event_processing_duration_seconds_bucket{le="1.0"} 1200

# HELP redpanda_connection_status Current connection status
# TYPE redpanda_connection_status gauge
redpanda_connection_status{broker="localhost:9092"} 1

3. Testing Strategy

    
mindmap
  root((Testing Strategy))
      Unit Tests
          Repository Tests
          Service Tests
          Handler Tests
          Event Tests
      Integration Tests
          API Tests
          Event Flow Tests
          Database Tests
      Performance Tests
          Load Tests
          Stress Tests
          Scalability Tests
      Monitoring
          Metrics
          Logging
          Tracing

  
// stock_test.go
package main

func TestStockService(t *testing.T) {
    // Set up test environment
    ctx := context.Background()
    cfg := testConfig()

    // Create test dependencies
    db := setupTestDB(t)
    redpanda := setupTestRedpanda(t)
    cache := setupTestCache(t)

    // Create service
    svc := NewStockService(cfg, db, redpanda, cache)

    // Run tests
    t.Run("GetStock", func(t *testing.T) {
        // Test cases
        tests := []struct {
            name    string
            itemID  uuid.UUID
            want    *StockLevel
            wantErr bool
        }{
            {
                name:    "existing item",
                itemID:  testItemID,
                want:    testStock,
                wantErr: false,
            },
            {
                name:    "non-existing item",
                itemID:  uuid.New(),
                want:    nil,
                wantErr: true,
            },
        }

        // Run test cases
        for _, tt := range tests {
            t.Run(tt.name, func(t *testing.T) {
                got, err := svc.GetStock(ctx, tt.itemID)
                if (err != nil) != tt.wantErr {
                    t.Errorf("GetStock() error = %v, wantErr %v", err, tt.wantErr)
                    return
                }
                if !reflect.DeepEqual(got, tt.want) {
                    t.Errorf("GetStock() = %v, want %v", got, tt.want)
                }
            })
        }
    })

    t.Run("UpdateStock", func(t *testing.T) {
        // Test cases for stock updates
    })

    t.Run("EventHandling", func(t *testing.T) {
        // Test cases for event handling
    })
}

4. Deployment Configuration

# stock-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: stock-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: stock-service
  template:
    metadata:
      labels:
        app: stock-service
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8080"
        prometheus.io/path: "/metrics"
    spec:
      containers:
        - name: stock-service
          image: inventory/stock-service:latest
          ports:
            - containerPort: 8080
          env:
            - name: DB_HOST
              valueFrom:
                configMapKeyRef:
                  name: inventory-config
                  key: db_host
            - name: REDPANDA_BROKERS
              valueFrom:
                configMapKeyRef:
                  name: inventory-config
                  key: redpanda_brokers
            - name: REDIS_ADDR
              valueFrom:
                configMapKeyRef:
                  name: inventory-config
                  key: redis_addr
          resources:
            requests:
              memory: "256Mi"
              cpu: "250m"
            limits:
              memory: "512Mi"
              cpu: "500m"
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 30
          readinessProbe:
            httpGet:
              path: /ready
              port: 8080
            initialDelaySeconds: 5
          volumeMounts:
            - name: config
              mountPath: /app/config
              readOnly: true
      volumes:
        - name: config
          configMap:
            name: stock-service-config

---
apiVersion: v1
kind: Service
metadata:
  name: stock-service
spec:
  selector:
    app: stock-service
  ports:
    - port: 80
      targetPort: 8080
  type: ClusterIP

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: stock-service
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: stock-service
  minReplicas: 3
  maxReplicas: 10
  metrics:
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

5. Monitoring and Logging

5.1 Metrics Configuration

// metrics.go
package main

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
)

type Metrics struct {
    stockOperations    *prometheus.CounterVec
    stockLevels        *prometheus.GaugeVec
    operationDuration  *prometheus.HistogramVec
    cacheHitRate       *prometheus.GaugeVec
    eventLatency       *prometheus.HistogramVec
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
    m := &Metrics{
        stockOperations: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Name: "stock_operations_total",
                Help: "Total number of stock operations",
            },
            []string{"operation", "status"},
        ),

        stockLevels: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: "stock_levels",
                Help: "Current stock levels by item",
            },
            []string{"item_id", "location_id"},
        ),

        operationDuration: promauto.NewHistogramVec(
            prometheus.HistogramOpts{
                Name: "stock_operation_duration_seconds",
                Help: "Duration of stock operations",
                Buckets: prometheus.DefBuckets,
            },
            []string{"operation"},
        ),

        cacheHitRate: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Name: "cache_hit_rate",
                Help: "Cache hit rate for stock operations",
            },
            []string{"operation"},
        ),

        eventLatency: promauto.NewHistogramVec(
            prometheus.HistogramOpts{
                Name: "event_processing_duration_seconds",
                Help: "Duration of event processing",
                Buckets: prometheus.DefBuckets,
            },
            []string{"event_type"},
        ),
    }

    reg.MustRegister(
        m.stockOperations,
        m.stockLevels,
        m.operationDuration,
        m.cacheHitRate,
        m.eventLatency,
    )

    return m
}

5.2 Structured Logging

// logger.go
package main

import (
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

func NewLogger(env string) (*zap.Logger, error) {
    var config zap.Config

    if env == "production" {
        config = zap.NewProductionConfig()
        config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
    } else {
        config = zap.NewDevelopmentConfig()
        config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
    }

    config.OutputPaths = []string{"stdout"}
    config.ErrorOutputPaths = []string{"stderr"}

    return config.Build(
        zap.AddCaller(),
        zap.AddStacktrace(zapcore.ErrorLevel),
    )
}

// Example usage in service
func (s *StockService) logOperation(ctx context.Context, operation string, fields ...zap.Field) {
    defaultFields := []zap.Field{
        zap.String("operation", operation),
        zap.String("service", "stock-service"),
    }

    if traceID := trace.FromContext(ctx); traceID != "" {
        defaultFields = append(defaultFields, zap.String("traceId", traceID))
    }

    s.logger.Info("Operation executed",
        append(defaultFields, fields...)...)
}

5.3 Distributed Tracing

// tracing.go
package main

import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
    "go.opentelemetry.io/otel/attribute"
)

func initTracer(serviceName string) (func(), error) {
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint("http://jaeger-collector:14268/api/traces"),
    ))
    if err != nil {
        return nil, fmt.Errorf("create jaeger exporter: %w", err)
    }

    tp := trace.NewTracerProvider(
        trace.WithBatcher(exporter),
        trace.WithResource(resource.NewWithAttributes(
            semconv.SchemaURL,
            semconv.ServiceNameKey.String(serviceName),
        )),
    )

    otel.SetTracerProvider(tp)
    return func() {
        if err := tp.Shutdown(context.Background()); err != nil {
            log.Printf("Error shutting down tracer provider: %v", err)
        }
    }, nil
}

// Example usage in service
func (s *StockService) UpdateStock(ctx context.Context, req UpdateStockRequest) error {
    ctx, span := s.tracer.Start(ctx, "UpdateStock")
    defer span.End()

    span.SetAttributes(
        attribute.String("item.id", req.ItemID.String()),
        attribute.Int("quantity", req.Quantity),
    )

    // Business logic here
    if err := s.doUpdate(ctx, req); err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
        return err
    }

    return nil
}

6. Performance Optimization

6.1 Caching Strategy

    
flowchart TB
  subgraph "Cache Strategy"
      direction TB
      A[Request] --> B{Cache Hit?}
      B -->|Yes| C[Return Cached]
      B -->|No| D[Query DB]
      D --> E[Update Cache]
      E --> F[Return Fresh]
  end

  subgraph "Cache Invalidation"
      direction TB
      G[Write Operation] --> H[Update DB]
      H --> I[Publish Event]
      I --> J[Invalidate Cache]
  end

  subgraph "Cache Warming"
      direction TB
      K[Background Job] --> L[Get Popular Items]
      L --> M[Prefetch Data]
      M --> N[Update Cache]
  end


  
// cache.go
package main

type CacheManager struct {
    redis   *redis.Client
    metrics *Metrics
    logger  *zap.Logger
}

func (c *CacheManager) Get(ctx context.Context, key string, value interface{}) error {
    start := time.Now()
    defer func() {
        c.metrics.operationDuration.WithLabelValues("cache_get").Observe(
            time.Since(start).Seconds(),
        )
    }()

    data, err := c.redis.Get(ctx, key).Bytes()
    if err != nil {
        if err == redis.Nil {
            c.metrics.cacheHitRate.WithLabelValues("get").Set(0)
            return ErrCacheMiss
        }
        return fmt.Errorf("get from redis: %w", err)
    }

    if err := json.Unmarshal(data, value); err != nil {
        return fmt.Errorf("unmarshal cached value: %w", err)
    }

    c.metrics.cacheHitRate.WithLabelValues("get").Set(1)
    return nil
}

func (c *CacheManager) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
    data, err := json.Marshal(value)
    if err != nil {
        return fmt.Errorf("marshal value: %w", err)
    }

    if err := c.redis.Set(ctx, key, data, ttl).Err(); err != nil {
        return fmt.Errorf("set in redis: %w", err)
    }

    return nil
}

func (c *CacheManager) Invalidate(ctx context.Context, pattern string) error {
    iter := c.redis.Scan(ctx, 0, pattern, 0).Iterator()
    var keys []string

    for iter.Next(ctx) {
        keys = append(keys, iter.Val())
    }

    if err := iter.Err(); err != nil {
        return fmt.Errorf("scan keys: %w", err)
    }

    if len(keys) > 0 {
        if err := c.redis.Del(ctx, keys...).Err(); err != nil {
            return fmt.Errorf("delete keys: %w", err)
        }
    }

    return nil
}

// Cache warming
func (c *CacheManager) WarmCache(ctx context.Context, items []string) error {
    for _, itemID := range items {
        if err := c.warmItem(ctx, itemID); err != nil {
            c.logger.Error("warm cache failed",
                zap.String("itemId", itemID),
                zap.Error(err))
            continue
        }
    }
    return nil
}

func (c *CacheManager) warmItem(ctx context.Context, itemID string) error {
    // Implementation for warming up a single item
    return nil
}

6.2 Database Optimization

-- Partitioning for stock_movements
CREATE TABLE stock_movements_partition OF stock_movements
PARTITION BY RANGE (created_at);

-- Create monthly partitions
CREATE TABLE stock_movements_y2025m01 PARTITION OF stock_movements_partition
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

-- Create indexes on partitions
CREATE INDEX idx_movements_y2025m01_item
    ON stock_movements_y2025m01 (item_id);

-- Materialized view for common queries
CREATE MATERIALIZED VIEW mv_stock_summary AS
SELECT
    i.id as item_id,
    i.sku,
    i.name,
    l.id as location_id,
    l.name as location_name,
    sl.quantity,
    sl.reserved,
    sl.quantity - sl.reserved as available,
    sl.reorder_point,
    sl.safety_stock
FROM
    items i
    JOIN stock_levels sl ON i.id = sl.item_id
    JOIN locations l ON sl.location_id = l.id
WITH DATA;

-- Refresh materialized view
CREATE OR REPLACE FUNCTION refresh_stock_summary()
RETURNS trigger AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY mv_stock_summary;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_refresh_stock_summary
AFTER INSERT OR UPDATE OR DELETE ON stock_levels
FOR EACH STATEMENT
EXECUTE FUNCTION refresh_stock_summary();

6.3 Load Testing

// loadtest/main.go
package main

import (
    "context"
    "time"
    "github.com/tsenart/vegeta/lib"
)

func main() {
    // Configure test
    rate := vegeta.Rate{Freq: 100, Per: time.Second}
    duration := 5 * time.Minute
    targeter := vegeta.NewStaticTargeter(vegeta.Target{
        Method: "GET",
        URL:    "http://stock-service/api/stock/item-1",
    })

    // Run test
    attacker := vegeta.NewAttacker()
    var metrics vegeta.Metrics
    for res := range attacker.Attack(targeter, rate, duration, "Stock Service Test") {
        metrics.Add(res)
    }
    metrics.Close()

    // Report results
    reporter := vegeta.NewTextReporter(&metrics)
    reporter.Report(os.Stdout)
}

7. Error Handling

7.1 Circuit Breaker

// circuitbreaker.go
package main

type CircuitBreaker struct {
    mu          sync.RWMutex
    failures    int
    lastFailure time.Time
    state       State
    threshold   int
    timeout     time.Duration
}

func (cb *CircuitBreaker) Execute(operation func() error) error {
    if !cb.canExecute() {
        return ErrCircuitOpen
    }

    err := operation()
    cb.updateState(err)
    return err
}

func (cb *CircuitBreaker) canExecute() bool {
    cb.mu.RLock()
    defer cb.mu.RUnlock()

    if cb.state == StateClosed {
        return true
    }

    if cb.state == StateOpen &&
        time.Since(cb.lastFailure) >= cb.timeout {
        return true
    }

    return false
}

func (cb *CircuitBreaker) updateState(err error) {
    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        cb.failures++
        cb.lastFailure = time.Now()

        if cb.failures >= cb.threshold {
            cb.state = StateOpen
        }
    } else {
        cb.failures = 0
        cb.state = StateClosed
    }
}

7.2 Retry Mechanism

// retry.go
package main

func retry(ctx context.Context, operation func() error, opts RetryOptions) error {
    var lastErr error
    for attempt := 0; attempt < opts.MaxAttempts; attempt++ {
        if err := operation(); err != nil {
            lastErr = err
            if !opts.ShouldRetry(err) {
                return err
            }

            select {
            case <-ctx.Done():
                return ctx.Err()
            case <-time.After(opts.BackoffFunc(attempt)):
                continue
            }
        }
        return nil
    }
    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

type RetryOptions struct {
    MaxAttempts  int
    BackoffFunc  func(attempt int) time.Duration
    ShouldRetry  func(err error) bool
}

func ExponentialBackoff(base time.Duration) func(int) time.Duration {
    return func(attempt int) time.Duration {
        return base * time.Duration(math.Pow(2, float64(attempt)))
    }
}

7.3 Dead Letter Queue

// dlq.go
package main

type DeadLetterQueue struct {
    redpanda *kgo.Client
    logger   *zap.Logger
}

func (dlq *DeadLetterQueue) HandleFailedMessage(ctx context.Context, msg FailedMessage) error {
    record := &kgo.Record{
        Topic: "inventory.failures",
        Key:   []byte(msg.ID),
        Value: msg.Data,
        Headers: []kgo.RecordHeader{
            {Key: "error", Value: []byte(msg.Error.Error())},
            {Key: "original_topic", Value: []byte(msg.OriginalTopic)},
            {Key: "retry_count", Value: []byte(strconv.Itoa(msg.RetryCount))},
            {Key: "failed_at", Value: []byte(time.Now().Format(time.RFC3339))},
        },
    }

    results := dlq.redpanda.ProduceSync(ctx, record)
    return results.FirstErr()
}

func (dlq *DeadLetterQueue) ProcessFailures(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            if err := dlq.processFailedMessages(ctx); err != nil {
                dlq.logger.Error("process failures",
                    zap.Error(err))
                time.Sleep(time.Second * 5)
            }
        }
    }
}

func (dlq *DeadLetterQueue) processFailedMessages(ctx context.Context) error {
    // Implementation for processing failed messages
    return nil
}

2.4 Schema Registry and Event Schema Management

The Schema Registry provides a centralized schema management system that ensures data consistency and compatibility across our event-driven architecture. We use Apache Avro for schema definition due to its compact serialization and strong schema evolution capabilities.

2.4.1 Schema Organization

Our schemas are organized by domain and version:

schemas/
├── stock/
│   ├── commands/
│   │   ├── v1_stock_update.avsc
│   │   └── v1_stock_reserve.avsc
│   ├── events/
│   │   ├── v1_stock_updated.avsc
│   │   └── v1_stock_reserved.avsc
│   └── models/
│       ├── v1_stock_item.avsc
│       └── v1_stock_level.avsc
└── cdc/
    └── v1_stock_change.avsc

2.4.2 Core Schema Definitions

// schemas/stock/models/v1_stock_item.avsc
{
  "type": "record",
  "name": "StockItem",
  "namespace": "com.inventory.stock",
  "doc": "Represents a stock item in the inventory system",
  "fields": [
    {"name": "id", "type": "string", "doc": "Unique identifier for the item"},
    {"name": "sku", "type": "string", "doc": "Stock keeping unit"},
    {"name": "name", "type": "string", "doc": "Item name"},
    {"name": "description", "type": ["null", "string"], "default": null, "doc": "Item description"},
    {"name": "category_id", "type": "string", "doc": "Category identifier"},
    {
      "name": "metadata",
      "type": {
        "type": "map",
        "values": "string"
      },
      "default": {},
      "doc": "Additional item metadata"
    },
    {"name": "created_at", "type": "long", "logicalType": "timestamp-millis", "doc": "Creation timestamp"},
    {"name": "updated_at", "type": "long", "logicalType": "timestamp-millis", "doc": "Last update timestamp"}
  ]
}

// schemas/stock/events/v1_stock_updated.avsc
{
  "type": "record",
  "name": "StockUpdatedEvent",
  "namespace": "com.inventory.stock.events",
  "doc": "Event emitted when stock levels are updated",
  "fields": [
    {"name": "event_id", "type": "string", "doc": "Unique event identifier"},
    {"name": "event_type", "type": "string", "doc": "Event type identifier"},
    {"name": "event_time", "type": "long", "logicalType": "timestamp-millis", "doc": "Event timestamp"},
    {
      "name": "item",
      "type": "com.inventory.stock.StockItem",
      "doc": "The stock item that was updated"
    },
    {
      "name": "stock_level",
      "type": {
        "type": "record",
        "name": "StockLevel",
        "fields": [
          {"name": "quantity", "type": "int", "doc": "Current quantity"},
          {"name": "reserved", "type": "int", "doc": "Reserved quantity"},
          {"name": "available", "type": "int", "doc": "Available quantity (quantity - reserved)"},
          {"name": "location_id", "type": "string", "doc": "Location identifier"}
        ]
      }
    },
    {
      "name": "change",
      "type": {
        "type": "record",
        "name": "StockChange",
        "fields": [
          {"name": "previous_quantity", "type": "int"},
          {"name": "new_quantity", "type": "int"},
          {"name": "change_type", "type": "string"},
          {"name": "reason", "type": ["null", "string"], "default": null}
        ]
      }
    },
    {
      "name": "metadata",
      "type": {
        "type": "map",
        "values": "string"
      },
      "default": {},
      "doc": "Additional event metadata"
    }
  ]
}

2.4.3 Schema Evolution Rules

We follow these schema evolution rules to maintain compatibility:

  1. Backward Compatibility

    • New fields must have default values
    • Fields can’t be removed (can be deprecated)
    • Field types can’t be changed
  2. Forward Compatibility

    • Readers must ignore unknown fields
    • New fields must be optional or have defaults
    • Enums can only be extended
  3. Version Management

    • Major version changes for breaking changes
    • Minor version changes for backward-compatible additions
    • Patch version for documentation updates

2.4.4 Schema Registry Integration

// schema/registry.go
package schema

import (
    "github.com/linkedin/goavro/v2"
    "github.com/redpanda-data/console/backend/pkg/schema"
)

type SchemaRegistry struct {
    client     *schema.Client
    codecs     map[string]*goavro.Codec
    logger     *zap.Logger
}

func NewSchemaRegistry(config SchemaRegistryConfig) (*SchemaRegistry, error) {
    client, err := schema.NewClient(schema.Config{
        URLs:     config.URLs,
        Username: config.Username,
        Password: config.Password,
    })
    if err != nil {
        return nil, fmt.Errorf("create schema registry client: %w", err)
    }

    return &SchemaRegistry{
        client:  client,
        codecs:  make(map[string]*goavro.Codec),
        logger:  config.Logger,
    }, nil
}

func (r *SchemaRegistry) GetSchema(subject string, version int) (*goavro.Codec, error) {
    // Check cache first
    cacheKey := fmt.Sprintf("%s-v%d", subject, version)
    if codec, ok := r.codecs[cacheKey]; ok {
        return codec, nil
    }

    // Fetch from registry
    schema, err := r.client.GetSchemaByVersion(subject, version)
    if err != nil {
        return nil, fmt.Errorf("get schema: %w", err)
    }

    // Create codec
    codec, err := goavro.NewCodec(schema.Schema)
    if err != nil {
        return nil, fmt.Errorf("create codec: %w", err)
    }

    // Cache codec
    r.codecs[cacheKey] = codec
    return codec, nil
}

func (r *SchemaRegistry) RegisterSchema(subject, schema string) (int, error) {
    // Validate schema first
    if _, err := goavro.NewCodec(schema); err != nil {
        return 0, fmt.Errorf("invalid schema: %w", err)
    }

    // Register with compatibility check
    id, err := r.client.RegisterSchema(subject, schema, schema.AVRO)
    if err != nil {
        return 0, fmt.Errorf("register schema: %w", err)
    }

    return id, nil
}

2.4.5 Event Serialization/Deserialization

// events/serialization.go
package events

import (
    "encoding/binary"
    "github.com/linkedin/goavro/v2"
)

const (
    schemaIDLength = 4
    magicByte     = byte(0)
)

type Serializer struct {
    registry *schema.SchemaRegistry
    subject  string
    version  int
    codec    *goavro.Codec
}

func NewSerializer(registry *schema.SchemaRegistry, subject string, version int) (*Serializer, error) {
    codec, err := registry.GetSchema(subject, version)
    if err != nil {
        return nil, fmt.Errorf("get schema: %w", err)
    }

    return &Serializer{
        registry: registry,
        subject:  subject,
        version:  version,
        codec:    codec,
    }, nil
}

func (s *Serializer) Serialize(event interface{}) ([]byte, error) {
    // Convert event to native Go map
    native, err := event.ToNative()
    if err != nil {
        return nil, fmt.Errorf("convert to native: %w", err)
    }

    // Serialize data
    data, err := s.codec.BinaryFromNative(nil, native)
    if err != nil {
        return nil, fmt.Errorf("serialize: %w", err)
    }

    // Add schema ID
    schemaID := make([]byte, schemaIDLength)
    binary.BigEndian.PutUint32(schemaID, uint32(s.version))

    // Combine magic byte, schema ID, and data
    result := make([]byte, 1+schemaIDLength+len(data))
    result[0] = magicByte
    copy(result[1:], schemaID)
    copy(result[1+schemaIDLength:], data)

    return result, nil
}

func (s *Serializer) Deserialize(data []byte) (interface{}, error) {
    if len(data) < 1+schemaIDLength {
        return nil, fmt.Errorf("data too short")
    }

    // Verify magic byte
    if data[0] != magicByte {
        return nil, fmt.Errorf("invalid magic byte")
    }

    // Extract schema ID
    schemaID := binary.BigEndian.Uint32(data[1:1+schemaIDLength])

    // Get codec for schema ID
    codec, err := s.registry.GetSchema(s.subject, int(schemaID))
    if err != nil {
        return nil, fmt.Errorf("get schema: %w", err)
    }

    // Deserialize data
    native, _, err := codec.NativeFromBinary(data[1+schemaIDLength:])
    if err != nil {
        return nil, fmt.Errorf("deserialize: %w", err)
    }

    return native, nil
}

2.4.6 Usage Example

// Example of publishing an event with schema validation
func (p *StockEventPublisher) PublishStockUpdatedEvent(ctx context.Context, event StockUpdatedEvent) error {
    // Get serializer for stock updated events
    serializer, err := NewSerializer(p.registry, "inventory.stock.events.StockUpdated", 1)
    if err != nil {
        return fmt.Errorf("create serializer: %w", err)
    }

    // Serialize event
    data, err := serializer.Serialize(event)
    if err != nil {
        return fmt.Errorf("serialize event: %w", err)
    }

    // Create record
    record := &kgo.Record{
        Topic: "inventory.stock.events",
        Key:   []byte(event.Item.ID),
        Value: data,
        Headers: []kgo.RecordHeader{
            {Key: "event_type", Value: []byte(event.EventType)},
            {Key: "schema_version", Value: []byte("1")},
        },
    }

    // Publish event
    results := p.client.ProduceSync(ctx, record)
    if err := results.FirstErr(); err != nil {
        return fmt.Errorf("produce event: %w", err)
    }

    return nil
}

// Example of consuming an event with schema validation
func (s *StockEventSubscriber) handleRecord(ctx context.Context, record *kgo.Record) error {
    // Get deserializer for stock events
    deserializer, err := NewSerializer(s.registry, "inventory.stock.events.StockUpdated", 1)
    if err != nil {
        return fmt.Errorf("create deserializer: %w", err)
    }

    // Deserialize event
    native, err := deserializer.Deserialize(record.Value)
    if err != nil {
        return fmt.Errorf("deserialize event: %w", err)
    }

    // Convert to concrete type
    event, err := StockUpdatedEventFromNative(native)
    if err != nil {
        return fmt.Errorf("convert event: %w", err)
    }

    // Process event
    return s.handler.HandleStockEvent(ctx, event)
}

2.4.7 Schema Evolution Example

Here’s an example of evolving the StockItem schema while maintaining compatibility:

// schemas/stock/models/v2_stock_item.avsc
{
  "type": "record",
  "name": "StockItem",
  "namespace": "com.inventory.stock",
  "doc": "Represents a stock item in the inventory system",
  "fields": [
    { "name": "id", "type": "string", "doc": "Unique identifier for the item" },
    { "name": "sku", "type": "string", "doc": "Stock keeping unit" },
    { "name": "name", "type": "string", "doc": "Item name" },
    {
      "name": "description",
      "type": ["null", "string"],
      "default": null,
      "doc": "Item description"
    },
    { "name": "category_id", "type": "string", "doc": "Category identifier" },
    {
      "name": "metadata",
      "type": {
        "type": "map",
        "values": "string"
      },
      "default": {},
      "doc": "Additional item metadata"
    },
    {
      "name": "created_at",
      "type": "long",
      "logicalType": "timestamp-millis",
      "doc": "Creation timestamp"
    },
    {
      "name": "updated_at",
      "type": "long",
      "logicalType": "timestamp-millis",
      "doc": "Last update timestamp"
    },
    // New fields added in v2
    {
      "name": "tags",
      "type": {
        "type": "array",
        "items": "string"
      },
      "default": [],
      "doc": "Item tags for categorization"
    },
    {
      "name": "dimensions",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Dimensions",
          "fields": [
            { "name": "length", "type": "double" },
            { "name": "width", "type": "double" },
            { "name": "height", "type": "double" },
            { "name": "unit", "type": "string" }
          ]
        }
      ],
      "default": null,
      "doc": "Item dimensions"
    }
  ]
}

This schema evolution:

  1. Maintains backward compatibility
  2. Adds new optional fields with defaults
  3. Introduces a nested record structure
  4. Preserves all existing fields

Conclusion

Building a modern inventory management system requires careful consideration of architecture, scalability, and reliability. Throughout this guide, we’ve explored how to build a robust, event-driven inventory system that can handle the demands of modern commerce.

Key Takeaways

  1. Event-Driven Architecture: By leveraging Redpanda as our event streaming platform, we’ve created a system where services can communicate asynchronously, enabling real-time processing and system decoupling. This approach allows our inventory system to scale independently and maintain high throughput even during peak loads.

  2. CQRS Pattern: Separating our read and write concerns through the Command Query Responsibility Segregation pattern has allowed us to optimize each path independently. Write operations maintain consistency through transactional boundaries, while read operations can be highly optimized with caching and specialized data models.

  3. Schema Evolution: Using Schema Registry with Redpanda ensures that our events maintain compatibility as our system evolves. This provides a robust foundation for long-term maintainability and prevents breaking changes from disrupting operations.

  4. Change Data Capture: Implementing CDC with Debezium allows us to capture database changes as events, creating a reliable audit trail and enabling real-time analytics without impacting our primary database performance.

  5. Resilience Patterns: Through circuit breakers, retry mechanisms, and dead letter queues, our system can gracefully handle failures and recover automatically from transient issues, ensuring high availability and data integrity.

  6. Observability: Comprehensive monitoring with Prometheus metrics, structured logging, and distributed tracing gives us visibility into system behavior, making it easier to diagnose issues and optimize performance.

Business Impact

The modern event-driven inventory management system delivers significant business value across multiple dimensions:

    
graph LR
  subgraph "Customer Experience Impact"
      direction TB
      CE1[Real-time Visibility]
      CE2[Accurate Delivery Promises]
      CE3[No Overselling]
      CE4[Omnichannel Experience]
      CE5[Personalized Service]
      
      CE1 & CE2 & CE3 & CE4 & CE5 --> CS[Enhanced Customer Satisfaction]
  end

  subgraph "Operational Excellence"
      direction TB
      OE1[Automated Workflows]
      OE2[Real-time Analytics]
      OE3[Reduced Manual Work]
      OE4[Predictive Insights]
      OE5[Process Optimization]

      OE1 & OE2 & OE3 & OE4 & OE5 --> OI[Operational Improvement]
  end

  subgraph "Financial Benefits"
      direction TB
      FB1[Reduced Inventory Costs]
      FB2[Better Cash Flow]
      FB3[Lower Operating Costs]
      FB4[Increased Sales]
      FB5[Higher Margins]

      FB1 & FB2 & FB3 & FB4 & FB5 --> FI[Financial Impact]
  end

  subgraph "Strategic Advantages"
      direction TB
      SA1[Market Agility]
      SA2[Competitive Edge]
      SA3[Innovation Enablement]
      SA4[Scalable Growth]
      SA5[Global Expansion]

      SA1 & SA2 & SA3 & SA4 & SA5 --> SI[Strategic Impact]
  end

  CS --> BV[Total Business Value]
  OI --> BV
  FI --> BV
  SI --> BV

  style BV fill:#f9f,stroke:#333,stroke-width:4px
  style CS fill:#bbf,stroke:#333,stroke-width:2px
  style OI fill:#bbf,stroke:#333,stroke-width:2px
  style FI fill:#bbf,stroke:#333,stroke-width:2px
  style SI fill:#bbf,stroke:#333,stroke-width:2px

  classDef impact fill:#e1e1e1,stroke:#333,stroke-width:1px
  class CE1,CE2,CE3,CE4,CE5,OE1,OE2,OE3,OE4,OE5,FB1,FB2,FB3,FB4,FB5,SA1,SA2,SA3,SA4,SA5 impact


  

1. Customer Experience Impact

2. Operational Excellence

3. Financial Benefits

4. Strategic Advantages

Future Directions

    
mindmap
  root((Future
  Evolution))
      AI/ML Integration
          Demand Forecasting
              Historical Analysis
              Seasonal Patterns
              Market Trends
          Anomaly Detection
              Fraud Prevention
              Quality Control
              Pattern Recognition
          Automated Reordering
              Safety Stock
              Lead Time
              Optimal Quantity
      Edge Computing
          In-Store Processing
              Local Inventory
              POS Integration
              Real-time Updates
          Warehouse Operations
              Pick/Pack Optimization
              Space Utilization
              Equipment Tracking
          Offline Capabilities
              Local Cache
              Sync Mechanism
              Conflict Resolution
      Blockchain Integration
          Supply Chain
              Product Tracking
              Origin Verification
              Chain of Custody
          Smart Contracts
              Automated Payments
              SLA Enforcement
              Compliance
          Sustainability
              Carbon Footprint
              Ethical Sourcing
              Waste Reduction
      IoT Enhancement
          RFID Systems
              Asset Tracking
              Automated Counting
              Loss Prevention
          Sensor Networks
              Environmental
              Movement Detection
              Usage Patterns
          Automated Systems
              Robotic Storage
              AGV Integration
              Smart Shelving
      Advanced Analytics
          Predictive Models
              Stock Optimization
              Resource Planning
              Risk Assessment
          Real-time Dashboards
              KPI Monitoring
              Alert System
              Decision Support
          Business Intelligence
              Custom Reports
              Data Mining
              Trend Analysis

  
    
graph TB
  subgraph "Current State"
      C1[Basic Automation]
      C2[Manual Decisions]
      C3[Reactive Approach]
      C4[Limited Integration]
  end

  subgraph "Near Future"
      N1[Smart Automation]
      N2[AI-Assisted Decisions]
      N3[Predictive Systems]
      N4[IoT Integration]
  end

  subgraph "Future Vision"
      F1[Autonomous Operations]
      F2[AI-Driven Strategy]
      F3[Prescriptive Analytics]
      F4[Full Integration]
  end

  C1 --> N1
  C2 --> N2
  C3 --> N3
  C4 --> N4

  N1 --> F1
  N2 --> F2
  N3 --> F3
  N4 --> F4

  style C1 fill:#f9f,stroke:#333,stroke-width:2px
  style C2 fill:#f9f,stroke:#333,stroke-width:2px
  style C3 fill:#f9f,stroke:#333,stroke-width:2px
  style C4 fill:#f9f,stroke:#333,stroke-width:2px

  style N1 fill:#bbf,stroke:#333,stroke-width:2px
  style N2 fill:#bbf,stroke:#333,stroke-width:2px
  style N3 fill:#bbf,stroke:#333,stroke-width:2px
  style N4 fill:#bbf,stroke:#333,stroke-width:2px

  style F1 fill:#bfb,stroke:#333,stroke-width:2px
  style F2 fill:#bfb,stroke:#333,stroke-width:2px
  style F3 fill:#bfb,stroke:#333,stroke-width:2px
  style F4 fill:#bfb,stroke:#333,stroke-width:2px


  

By building on the solid foundation of event-driven architecture and cloud-native principles we’ve established, these enhancements can be incrementally added to create an increasingly sophisticated inventory management system that adapts to changing business needs.

The modern inventory management system we’ve built is not just about tracking stock levels—it’s about creating a digital nervous system that can sense, respond, and adapt to the complex demands of today’s supply chains. With this architecture, businesses can achieve the real-time visibility, operational efficiency, and scalability needed to thrive in an increasingly competitive marketplace.

← Back to Blog