In today’s fast-paced digital commerce landscape, traditional inventory management approaches are breaking under the strain of omnichannel retail, rapid fulfillment demands, and complex supply chains. This comprehensive guide explores building a modern, event-driven inventory management system (IMS) that can scale to meet these challenges while maintaining data consistency and real-time capabilities.
timeline
title Evolution of Inventory Systems
1960(s) : Manual Systems : Paper Ledgers
: Physical counting
: Basic record keeping
: Weekly reconciliation
1980(s) : Early Digital : Mainframe Systems
: Basic databases
: Batch processing
: Monthly reporting
1990(s) : Client-Server : Distributed Systems
: Local databases
: Daily updates
: Barcode scanning
2000(s) : Enterprise Systems : ERP Integration
: Centralized databases
: Real-time updates
: RFID tracking
2010(s) : Cloud Solutions : SaaS & PaaS
: Mobile integration
: API-first approach
: Multi-channel
: ML predictions
2020(s) : Event-Driven : Microservices
: Real-time analytics
: Omnichannel
: CQRS patterns
: Edge computing
: AI-driven
Modern inventory management faces several critical challenges that require sophisticated solutions. These challenges have evolved with the increasing complexity of global supply chains, omnichannel retail, and customer expectations for real-time accuracy.
In today’s fast-paced retail environment, maintaining accurate inventory data is more challenging and critical than ever. Businesses must track inventory across multiple channels, locations, and systems while ensuring data consistency and real-time updates.
mindmap
root((Data
Accuracy))
Real-time Updates
Multi-channel Sales
Returns Processing
Stock Adjustments
Warehouse Movements
Multi-source Sync
Store Systems
E-commerce Platform
Warehouse Management
Supplier Integration
Error Prevention
Validation Rules
Automated Checks
Reconciliation
Audit Trails
Key Implications:
Solution Requirements:
As businesses grow, their inventory management systems must handle increasing volumes of data and transactions while maintaining performance and reliability.
mindmap
root((Scale
Challenges))
High Volume
Millions of SKUs
Multiple Locations
Concurrent Users
Peak Processing
Geographic Distribution
Multiple Regions
Data Replication
Latency Management
Legal Compliance
System Performance
Response Time
Processing Speed
Resource Usage
Cost Efficiency
Key Implications:
Solution Requirements:
Modern inventory systems must integrate with a diverse ecosystem of applications, partners, and services, each with its own protocols, data formats, and requirements.
mindmap
root((Integration
Complexity))
Legacy Systems
ERP Integration
Custom Protocols
Data Migration
System Constraints
Partner Networks
Supplier Systems
Logistics Partners
Payment Providers
Analytics Tools
New Channels
Mobile Apps
Social Commerce
Marketplaces
Voice Commerce
Key Implications:
Solution Requirements:
Modern inventory management must accommodate complex business rules, workflows, and requirements while maintaining flexibility and adaptability.
mindmap
root((Business
Complexity))
Product Management
Multiple SKUs
Variants Handling
Bundle Products
Digital Products
Location Management
Warehouses
Retail Stores
Dark Stores
Partner Locations
Inventory Rules
Safety Stock
Reorder Points
Allocation Rules
Reservation Logic
Key Implications:
Solution Requirements:
Inventory management systems must meet stringent security requirements and comply with various regulations while protecting sensitive business data.
mindmap
root((Compliance &
Security))
Regulations
Data Protection
Financial Rules
Industry Standards
Local Laws
Auditing
Transaction Logs
Change History
Access Records
System Events
Security
Access Control
Data Encryption
Threat Detection
Incident Response
Key Implications:
Solution Requirements:
These challenges are interconnected and their impact extends across the entire organization:
Operational Impact
Business Impact
Technical Impact
The solution to these challenges requires a modern, event-driven architecture that can:
classDiagram
class InventorySystem {
+addStock(item: Item, quantity: number)
+removeStock(item: Item, quantity: number)
+adjustStock(item: Item, adjustment: number)
+moveStock(item: Item, from: Location, to: Location)
+reserveStock(item: Item, quantity: number)
+confirmReservation(reservation: Reservation)
+cancelReservation(reservation: Reservation)
+getStockLevel(item: Item)
+getStockHistory(item: Item)
+getForecast(item: Item)
}
class StockLevel {
+number available
+number reserved
+number inTransit
+number backorder
+number reorderPoint
+number safetyStock
}
class Item {
+string id
+string sku
+string name
+string description
+Category category
}
class Location {
+string id
+string name
+string type
+Address address
}
class Reservation {
+string id
+Item item
+number quantity
+string status
+Date expiresAt
}
InventorySystem ..> StockLevel : returns
InventorySystem ..> Reservation : manages
InventorySystem --> Item : manages
InventorySystem --> Location : manages
graph TB
subgraph "Performance Requirements"
P1[Response Time] --> P2[< 100ms for 99th percentile]
P1 --> P3[< 1s for batch operations]
P4[Throughput] --> P5[10K transactions/second]
P4 --> P6[1M events/day]
end
subgraph "Scalability Requirements"
S1[Horizontal Scaling] --> S2[Auto-scaling support]
S1 --> S3[Multi-region deployment]
S4[Data Growth] --> S5[1TB/month]
S4 --> S6[3 years retention]
end
subgraph "Reliability Requirements"
R1[Availability] --> R2[99.99% uptime]
R1 --> R3[24/7 operation]
R4[Fault Tolerance] --> R5[No single point of failure]
R4 --> R6[Automatic failover]
end
subgraph "Security Requirements"
SE1[Authentication] --> SE2[OAuth 2.0/OIDC]
SE1 --> SE3[Role-based access]
SE4[Data Protection] --> SE5[Encryption at rest]
SE4 --> SE6[TLS in transit]
end
Response Time
Throughput
Resource Utilization
Horizontal Scaling
Data Growth
Load Handling
Availability
Fault Tolerance
Data Consistency
Authentication & Authorization
Data Protection
Compliance
Monitoring
Deployment
Documentation
The component architecture integrates several key aspects including service mesh, security, and observability:
Service Mesh Integration
Security Controls
Observability Stack
graph TB
subgraph "Frontend Layer"
UI[Web UI]
Mobile[Mobile Apps]
B2B[B2B Portal]
end
subgraph "API Gateway Layer"
Gateway[API Gateway]
Auth[Authentication]
RateLimit[Rate Limiter]
end
subgraph "Service Mesh Layer"
IC[Istio Control]
Proxy1[Envoy Proxy 1]
Proxy2[Envoy Proxy 2]
Proxy3[Envoy Proxy 3]
end
subgraph "Command Services"
Stock[Stock Service]
Location[Location Service]
Reserve[Reservation Service]
Order[Order Service]
end
subgraph "Event Processing"
RP[Redpanda Cluster]
SR[Schema Registry]
DLQ[Dead Letter Queue]
CDC[Change Data Capture]
end
subgraph "Query Services"
Analytics[Analytics Service]
Report[Reporting Service]
Search[Search Service]
Cache[Redis Cache]
end
subgraph "Storage Layer"
PG[(PostgreSQL)]
ES[(Elasticsearch)]
Metrics[(Prometheus)]
end
UI --> Gateway
Mobile --> Gateway
B2B --> Gateway
Gateway --> Auth
Gateway --> RateLimit
Auth --> IC
IC --> Proxy1
IC --> Proxy2
IC --> Proxy3
Proxy1 --> Stock
Proxy2 --> Location
Proxy3 --> Reserve
Stock --> RP
Location --> RP
Reserve --> RP
Order --> RP
RP --> Analytics
RP --> Report
RP --> Search
RP --> DLQ
Stock --> PG
Location --> PG
Reserve --> PG
Order --> PG
PG --> CDC
CDC --> RP
Analytics --> Cache
Report --> ES
Search --> ES
Stock --> Metrics
Location --> Metrics
Reserve --> Metrics
Order --> Metrics
sequenceDiagram
participant C as Client
participant G as API Gateway
participant M as Service Mesh
participant S as Stock Service
participant R as Redpanda
participant P as PostgreSQL
participant Q as Query Service
participant Ca as Cache
participant O as Observability
C->>G: Request stock update
G->>M: Route request
M->>S: Forward with mTLS
rect rgb(240, 240, 240)
Note over S,P: Transaction Boundary
S->>P: Begin transaction
S->>P: Update stock
S->>R: Publish event
S->>P: Commit transaction
end
R-->>Q: Consume event
Q->>Ca: Update cache
Q-->>C: Send update notification
M->>O: Send telemetry
S->>O: Send metrics
P->>O: Send database metrics
graph TB
subgraph "Kubernetes Cluster"
subgraph "Ingress Layer"
Ingress[Ingress Controller]
Cert[Cert Manager]
end
subgraph "Application Layer"
API[API Gateway]
Stock[Stock Service]
Query[Query Service]
end
subgraph "Data Layer"
RP[Redpanda StatefulSet]
PG[PostgreSQL StatefulSet]
Redis[Redis StatefulSet]
end
subgraph "Monitoring"
Prom[Prometheus]
Graf[Grafana]
Alert[AlertManager]
end
end
subgraph "External Services"
DNS[DNS]
CDN[CDN]
Backup[Backup Storage]
end
DNS --> Ingress
CDN --> Ingress
Ingress --> API
API --> Stock
API --> Query
Stock --> RP
Stock --> PG
Query --> Redis
PG --> Backup
RP --> Backup
For this implementation, we’ll use Minikube as our local Kubernetes cluster. Minikube provides a lightweight way to run Kubernetes locally for development and testing.
# Start Minikube with sufficient resources for our stack
minikube start --cpus 4 --memory 8192 --disk-size 20g
# Enable necessary addons
minikube addons enable metrics-server
minikube addons enable ingress
# Verify the cluster is running
kubectl cluster-info
Expected output:
Kubernetes control plane is running at https://192.168.49.2:8443
CoreDNS is running at https://192.168.49.2:8443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
For development purposes, we’ll configure Minikube to use the local Docker daemon:
# Configure shell to use Minikube's Docker daemon
eval $(minikube docker-env)
First, let’s create a dedicated namespace for Redpanda:
kubectl create namespace redpanda
Add the Redpanda Helm repository and update it:
helm repo add redpanda https://charts.redpanda.com
helm repo update
Create a file named redpanda-values.yaml with a minimal configuration for local development:
# redpanda-values.yaml
statefulset:
initContainers:
setDataDirOwnership:
enabled: true
replicas: 1
resources:
cpu:
cores: 1
memory:
container:
max: 1.5Gi
redpanda:
reserveMemory: 1Gi
storage:
persistentVolume:
enabled: true
size: 4Gi
storageClass: "standard" # Use Minikube's default storage class
auth:
sasl:
enabled: false
tls:
enabled: false
console:
enabled: true
auth:
enabled: false
Install Redpanda using Helm:
helm install redpanda redpanda/redpanda \
--namespace redpanda \
--values redpanda-values.yaml
Verify the installation:
# Check if pods are running
kubectl get pods -n redpanda
# Expected output:
NAME READY STATUS RESTARTS AGE
redpanda-0 1/1 Running 0 2m
redpanda-console-7b9c8b6f7-x9j2v 1/1 Running 0 2m
Set up port forwarding for local access:
# For Kafka API
kubectl port-forward -n redpanda redpanda-0 9092:9092 &
# For Schema Registry
kubectl port-forward -n redpanda redpanda-0 8081:8081 &
# For Redpanda Console
kubectl port-forward -n redpanda svc/redpanda-console 8080:8080 &
Test connectivity using rpk:
# Create a test topic
rpk topic create test-topic \
--brokers localhost:9092
# List topics
rpk topic list \
--brokers localhost:9092
# Expected output:
TOPIC PARTITIONS REPLICAS
test-topic 1 1
For production deployments, you should use a more comprehensive configuration:
# redpanda-prod-values.yaml
statefulset:
initContainers:
setDataDirOwnership:
enabled: true
replicas: 3
resources:
cpu:
cores: 2
memory:
container:
max: 8Gi
redpanda:
reserveMemory: 6Gi
storage:
persistentVolume:
enabled: true
size: 100Gi
storageClass: "ceph-filesystem" # Use appropriate storage class for your cloud provider
auth:
sasl:
enabled: true
users:
- name: admin
password: ${REDPANDA_ADMIN_PASSWORD}
tls:
enabled: true
monitoring:
enabled: true
serviceMonitor:
enabled: true
console:
enabled: true
auth:
enabled: true
# postgres-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: postgres-config
namespace: database
data:
postgresql.conf: |
listen_addresses = '*'
max_connections = 100
shared_buffers = 256MB
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
wal_sender_timeout = 60s
max_worker_processes = 10
track_commit_timestamp = on
pg_hba.conf: |
local all all trust
host all all 127.0.0.1/32 trust
host all all ::1/128 trust
host all all 0.0.0.0/0 md5
host replication all 0.0.0.0/0 md5
---
# postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres
namespace: database
spec:
serviceName: postgres
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15
ports:
- containerPort: 5432
name: postgres
env:
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: postgres-secrets
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secrets
key: password
- name: POSTGRES_DB
value: inventory
- name: PGDATA
value: /var/lib/postgresql/data/pgdata
volumeMounts:
- name: postgres-data
mountPath: /var/lib/postgresql/data
- name: postgres-config
mountPath: /etc/postgresql/postgresql.conf
subPath: postgresql.conf
- name: postgres-config
mountPath: /etc/postgresql/pg_hba.conf
subPath: pg_hba.conf
resources:
requests:
memory: 2Gi
cpu: 1000m
limits:
memory: 4Gi
cpu: 2000m
volumeClaimTemplates:
- metadata:
name: postgres-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
# debezium-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: debezium-config
namespace: debezium
data:
application.properties: |
bootstrap.servers=redpanda.redpanda.svc.cluster.local:9092
group.id=inventory-connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://redpanda.redpanda.svc.cluster.local:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://redpanda.redpanda.svc.cluster.local:8081
rest.port=8083
rest.advertised.host.name=debezium
plugin.path=/kafka/connect
---
# debezium-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: debezium
namespace: debezium
spec:
replicas: 2
selector:
matchLabels:
app: debezium
template:
metadata:
labels:
app: debezium
spec:
containers:
- name: debezium
image: debezium/connect:2.3
ports:
- containerPort: 8083
env:
- name: BOOTSTRAP_SERVERS
value: redpanda.redpanda.svc.cluster.local:9092
- name: GROUP_ID
value: inventory-connect-cluster
- name: CONFIG_STORAGE_TOPIC
value: connect-configs
- name: OFFSET_STORAGE_TOPIC
value: connect-offsets
- name: STATUS_STORAGE_TOPIC
value: connect-status
volumeMounts:
- name: debezium-config
mountPath: /kafka/config/connect-distributed.properties
subPath: application.properties
resources:
requests:
memory: 1Gi
cpu: 500m
limits:
memory: 2Gi
cpu: 1000m
volumes:
- name: debezium-config
configMap:
name: debezium-config
The service mesh provides a dedicated infrastructure layer for handling service-to-service communication in our distributed inventory system. We use Istio as our service mesh implementation to manage, secure, and observe microservices interactions.
graph TB
subgraph "Control Plane"
IC[Istio Control]
style IC fill:#f9f,stroke:#333
subgraph "Management"
Config[Configuration API]
Discovery[Service Discovery]
Security[Security Management]
end
end
subgraph "Data Plane"
subgraph "Service Pod A"
App1[Application]
Proxy1[Envoy Proxy]
end
subgraph "Service Pod B"
App2[Application]
Proxy2[Envoy Proxy]
end
end
IC --> Proxy1
IC --> Proxy2
Proxy1 <--> Proxy2
App1 --> Proxy1
App2 --> Proxy2
Traffic Management
Security
Observability
Our security architecture implements defense in depth, ensuring protection at every layer of the system:
graph TB
subgraph "External Security"
WAF[Web Application Firewall]
DDOS[DDoS Protection]
API[API Gateway]
end
subgraph "Identity & Access"
Auth[Authentication]
RBAC[Role-Based Access]
OAuth[OAuth2/OIDC]
end
subgraph "Data Security"
Encrypt[Encryption]
Mask[Data Masking]
Keys[Key Management]
end
subgraph "Network Security"
MTLS[Mutual TLS]
SEG[Network Segmentation]
FW[Network Policies]
end
WAF --> API
API --> Auth
Auth --> OAuth
OAuth --> RBAC
RBAC --> Encrypt
Encrypt --> MTLS
OAuth 2.0/OIDC Implementation
Data Protection
Network Security
Compliance & Audit
Our observability implementation provides comprehensive visibility across all system components:
graph TB
subgraph "Data Collection"
Metrics[Prometheus Metrics]
Logs[Loki Logs]
Traces[Tempo Traces]
end
subgraph "Processing"
Rules[Alert Rules]
Aggregation[Log Aggregation]
Analysis[Trace Analysis]
end
subgraph "Visualization"
Grafana[Grafana Dashboards]
Alerts[Alert Manager]
Reports[Custom Reports]
end
Metrics --> Rules
Logs --> Aggregation
Traces --> Analysis
Rules --> Grafana
Aggregation --> Grafana
Analysis --> Grafana
Rules --> Alerts
Infrastructure Monitoring
Application Monitoring
Business Monitoring
Alert Management
# alertmanager-config.yaml
route:
receiver: "team-inventory"
group_by: ["alertname", "cluster", "service"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- match:
severity: critical
receiver: "team-inventory-critical"
repeat_interval: 1h
receivers:
- name: "team-inventory"
slack_configs:
- channel: "#inventory-alerts"
send_resolved: true
- name: "team-inventory-critical"
pagerduty_configs:
- service_key: "<key>"
send_resolved: true
graph TB
subgraph "Stock Service"
API[HTTP API Layer]
Logic[Business Logic]
Events[Event Handler]
Cache[Cache Layer]
DB[Database Layer]
end
subgraph "External Systems"
RP[Redpanda]
PG[PostgreSQL]
Redis[Redis Cache]
Prom[Prometheus]
end
API --> Logic
Logic --> Events
Logic --> Cache
Logic --> DB
Events --> RP
DB --> PG
Cache --> Redis
Logic --> Prom
// config.go
package main
type Config struct {
DBConfig PostgresConfig
RedpandaConfig RedpandaConfig
CacheConfig CacheConfig
MetricsConfig MetricsConfig
ServerConfig ServerConfig
}
type PostgresConfig struct {
Host string
Port int
Database string
User string
Password string
MaxConns int
MinConns int
MaxIdleTime string
}
type RedpandaConfig struct {
Brokers []string
SchemaRegistry string
ConsumerGroup string
CommandTopic string
EventTopic string
FailureTopic string
}
type CacheConfig struct {
Address string
Password string
DB int
TTL string
}
type MetricsConfig struct {
Enabled bool
Port int
Path string
}
type ServerConfig struct {
Port int
ReadTimeout string
WriteTimeout string
ShutdownTimeout string
}
// models.go
package main
import (
"time"
"github.com/google/uuid"
)
type StockItem struct {
ID uuid.UUID `json:"id"`
SKU string `json:"sku"`
Name string `json:"name"`
Description string `json:"description"`
CategoryID uuid.UUID `json:"categoryId"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type StockLevel struct {
ID uuid.UUID `json:"id"`
ItemID uuid.UUID `json:"itemId"`
LocationID uuid.UUID `json:"locationId"`
Quantity int `json:"quantity"`
Reserved int `json:"reserved"`
ReorderPoint int `json:"reorderPoint"`
SafetyStock int `json:"safetyStock"`
Version int `json:"version"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type StockMovement struct {
ID uuid.UUID `json:"id"`
ItemID uuid.UUID `json:"itemId"`
FromLocationID *uuid.UUID `json:"fromLocationId"`
ToLocationID *uuid.UUID `json:"toLocationId"`
Quantity int `json:"quantity"`
Type string `json:"type"`
Status string `json:"status"`
ReferenceID *uuid.UUID `json:"referenceId"`
CreatedAt time.Time `json:"createdAt"`
CompletedAt *time.Time `json:"completedAt"`
}
sequenceDiagram participant C as Client participant H as HTTP Handler participant S as Service participant E as Event Handler participant R as Redpanda participant D as Database C->>H: POST /api/stock H->>S: Process Command S->>D: Begin Transaction S->>D: Update Stock S->>E: Create Event E->>R: Publish Event S->>D: Commit Transaction H-->>C: Response R-->>E: Consume Event E->>S: Process Event S->>D: Update State
// redpanda.go
package main
type RedpandaConfig struct {
Brokers []string
SchemaRegistry string
ClientID string
GroupID string
Topics Topics
}
type Topics struct {
StockCommands string
StockEvents string
StockFailures string
}
func NewRedpandaClient(cfg RedpandaConfig) (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers(cfg.Brokers...),
kgo.ClientID(cfg.ClientID),
kgo.ConsumerGroup(cfg.GroupID),
kgo.ConsumeTopics(cfg.Topics.StockCommands),
kgo.DisableAutoCommit(),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.RetryBackoffFn(func(attempt int) time.Duration {
return time.Millisecond * time.Duration(math.Min(float64(attempt*200), 1000))
}),
)
if err != nil {
return nil, fmt.Errorf("create redpanda client: %w", err)
}
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx); err != nil {
return nil, fmt.Errorf("ping redpanda: %w", err)
}
return client, nil
}
// Example configuration
var config = RedpandaConfig{
Brokers: []string{"localhost:9092"},
SchemaRegistry: "http://localhost:8081",
ClientID: "stock-service",
GroupID: "stock-service-group",
Topics: Topics{
StockCommands: "inventory.stock.commands",
StockEvents: "inventory.stock.events",
StockFailures: "inventory.stock.failures",
},
}
// publisher.go
package main
type StockEventPublisher struct {
client *kgo.Client
schema *sr.Client
logger *zap.Logger
metrics *Metrics
}
type StockEvent struct {
ID string `json:"id"`
Type string `json:"type"`
ItemID string `json:"itemId"`
Quantity int `json:"quantity"`
Location string `json:"location"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]string `json:"metadata,omitempty"`
}
func (p *StockEventPublisher) PublishStockEvent(ctx context.Context, event StockEvent) error {
start := time.Now()
defer func() {
p.metrics.eventLatency.WithLabelValues("publish").Observe(
time.Since(start).Seconds(),
)
}()
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
record := &kgo.Record{
Topic: "inventory.stock.events",
Key: []byte(event.ItemID),
Value: data,
Headers: []kgo.RecordHeader{
{Key: "event_type", Value: []byte(event.Type)},
{Key: "timestamp", Value: []byte(event.Timestamp.Format(time.RFC3339))},
},
}
results := p.client.ProduceSync(ctx, record)
if err := results.FirstErr(); err != nil {
p.logger.Error("failed to publish event",
zap.Error(err),
zap.String("event_id", event.ID),
zap.String("event_type", event.Type))
return fmt.Errorf("produce event: %w", err)
}
p.logger.Info("event published successfully",
zap.String("event_id", event.ID),
zap.String("event_type", event.Type),
zap.String("item_id", event.ItemID),
zap.Int("quantity", event.Quantity))
return nil
}
// Example usage:
func ExamplePublishEvent() {
event := StockEvent{
ID: uuid.New().String(),
Type: "STOCK_UPDATED",
ItemID: "item-123",
Quantity: 100,
Location: "warehouse-1",
Timestamp: time.Now(),
Metadata: map[string]string{
"user": "system",
"reason": "restock",
},
}
if err := publisher.PublishStockEvent(context.Background(), event); err != nil {
log.Printf("Failed to publish event: %v", err)
return
}
// Expected log output:
// {"level":"info","ts":"2025-03-17T11:23:45.678Z","caller":"publisher/publisher.go:89",
// "msg":"event published successfully","event_id":"550e8400-e29b-41d4-a716-446655440000",
// "event_type":"STOCK_UPDATED","item_id":"item-123","quantity":100}
}
// subscriber.go
package main
type StockEventSubscriber struct {
client *kgo.Client
handler StockEventHandler
logger *zap.Logger
metrics *Metrics
shutdownCh chan struct{}
}
type StockEventHandler interface {
HandleStockEvent(context.Context, StockEvent) error
}
func (s *StockEventSubscriber) Start(ctx context.Context) error {
s.logger.Info("starting stock event subscriber",
zap.Strings("topics", []string{"inventory.stock.events"}))
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.shutdownCh:
return nil
default:
fetches := s.client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
s.handleErrors(errs)
continue
}
s.processFetches(ctx, fetches)
}
}
}
func (s *StockEventSubscriber) processFetches(ctx context.Context, fetches kgo.Fetches) {
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
p.EachRecord(func(record *kgo.Record) {
start := time.Now()
s.metrics.eventLatency.WithLabelValues("consume").Observe(
time.Since(start).Seconds(),
)
var event StockEvent
if err := json.Unmarshal(record.Value, &event); err != nil {
s.logger.Error("failed to unmarshal event",
zap.Error(err),
zap.String("topic", record.Topic),
zap.Int32("partition", record.Partition),
zap.Int64("offset", record.Offset))
return
}
if err := s.handler.HandleStockEvent(ctx, event); err != nil {
s.logger.Error("failed to handle event",
zap.Error(err),
zap.String("event_id", event.ID),
zap.String("event_type", event.Type))
return
}
s.logger.Info("event processed successfully",
zap.String("event_id", event.ID),
zap.String("event_type", event.Type),
zap.String("item_id", event.ItemID),
zap.Duration("processing_time", time.Since(start)))
})
})
if err := s.client.CommitOffsets(ctx); err != nil {
s.logger.Error("failed to commit offsets", zap.Error(err))
}
}
// Example handler implementation:
type StockUpdateHandler struct {
repository *StockRepository
logger *zap.Logger
}
func (h *StockUpdateHandler) HandleStockEvent(ctx context.Context, event StockEvent) error {
h.logger.Info("handling stock event",
zap.String("event_id", event.ID),
zap.String("event_type", event.Type))
switch event.Type {
case "STOCK_UPDATED":
return h.handleStockUpdate(ctx, event)
case "STOCK_RESERVED":
return h.handleStockReservation(ctx, event)
default:
return fmt.Errorf("unknown event type: %s", event.Type)
}
}
// Example usage and logs:
func ExampleSubscriber() {
handler := &StockUpdateHandler{
repository: repo,
logger: logger,
}
subscriber := &StockEventSubscriber{
client: redpandaClient,
handler: handler,
logger: logger,
}
if err := subscriber.Start(context.Background()); err != nil {
log.Fatalf("Failed to start subscriber: %v", err)
}
// Expected log output:
// {"level":"info","ts":"2025-03-17T11:23:45.678Z","caller":"subscriber/subscriber.go:112",
// "msg":"starting stock event subscriber","topics":["inventory.stock.events"]}
//
// {"level":"info","ts":"2025-03-17T11:23:46.789Z","caller":"subscriber/subscriber.go:156",
// "msg":"event processed successfully","event_id":"550e8400-e29b-41d4-a716-446655440000",
// "event_type":"STOCK_UPDATED","item_id":"item-123","processing_time":"0.123s"}
}
Here’s what you might see in your logs and metrics when the system is running:
# Connection Logs
[2025-03-17T11:20:00Z] INFO Connecting to Redpanda brokers: [localhost:9092]
[2025-03-17T11:20:00Z] INFO Connected to Redpanda cluster successfully
[2025-03-17T11:20:00Z] INFO Schema Registry connected at http://localhost:8081
# Publisher Logs
[2025-03-17T11:23:45Z] INFO Publishing stock update event
[2025-03-17T11:23:45Z] DEBUG Event payload: {"id":"550e8400-e29b-41d4-a716-446655440000","type":"STOCK_UPDATED",...}
[2025-03-17T11:23:45Z] INFO Event published successfully (topic=inventory.stock.events, partition=0, offset=1234)
# Subscriber Logs
[2025-03-17T11:23:45Z] INFO Starting stock event subscriber
[2025-03-17T11:23:45Z] INFO Subscribed to topics: [inventory.stock.events]
[2025-03-17T11:23:46Z] INFO Received event: STOCK_UPDATED (id=550e8400-e29b-41d4-a716-446655440000)
[2025-03-17T11:23:46Z] DEBUG Processing event for item-123: quantity=100, location=warehouse-1
[2025-03-17T11:23:46Z] INFO Event processed successfully (processing_time=123ms)
# Error Logs
[2025-03-17T11:25:00Z] ERROR Failed to connect to broker: connection refused
[2025-03-17T11:25:01Z] INFO Retrying connection (attempt 1/3)
[2025-03-17T11:25:10Z] ERROR Failed to process event: validation error (event_id=550e8400-e29b-41d4-a716-446655440000)
# Metrics (Prometheus format)
# HELP redpanda_events_total Total number of events processed
# TYPE redpanda_events_total counter
redpanda_events_total{type="published"} 1234
redpanda_events_total{type="consumed"} 1200
# HELP redpanda_event_processing_duration_seconds Event processing duration
# TYPE redpanda_event_processing_duration_seconds histogram
redpanda_event_processing_duration_seconds_bucket{le="0.1"} 980
redpanda_event_processing_duration_seconds_bucket{le="0.5"} 1150
redpanda_event_processing_duration_seconds_bucket{le="1.0"} 1200
# HELP redpanda_connection_status Current connection status
# TYPE redpanda_connection_status gauge
redpanda_connection_status{broker="localhost:9092"} 1
mindmap
root((Testing Strategy))
Unit Tests
Repository Tests
Service Tests
Handler Tests
Event Tests
Integration Tests
API Tests
Event Flow Tests
Database Tests
Performance Tests
Load Tests
Stress Tests
Scalability Tests
Monitoring
Metrics
Logging
Tracing
// stock_test.go
package main
func TestStockService(t *testing.T) {
// Set up test environment
ctx := context.Background()
cfg := testConfig()
// Create test dependencies
db := setupTestDB(t)
redpanda := setupTestRedpanda(t)
cache := setupTestCache(t)
// Create service
svc := NewStockService(cfg, db, redpanda, cache)
// Run tests
t.Run("GetStock", func(t *testing.T) {
// Test cases
tests := []struct {
name string
itemID uuid.UUID
want *StockLevel
wantErr bool
}{
{
name: "existing item",
itemID: testItemID,
want: testStock,
wantErr: false,
},
{
name: "non-existing item",
itemID: uuid.New(),
want: nil,
wantErr: true,
},
}
// Run test cases
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := svc.GetStock(ctx, tt.itemID)
if (err != nil) != tt.wantErr {
t.Errorf("GetStock() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetStock() = %v, want %v", got, tt.want)
}
})
}
})
t.Run("UpdateStock", func(t *testing.T) {
// Test cases for stock updates
})
t.Run("EventHandling", func(t *testing.T) {
// Test cases for event handling
})
}
# stock-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: stock-service
spec:
replicas: 3
selector:
matchLabels:
app: stock-service
template:
metadata:
labels:
app: stock-service
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
prometheus.io/path: "/metrics"
spec:
containers:
- name: stock-service
image: inventory/stock-service:latest
ports:
- containerPort: 8080
env:
- name: DB_HOST
valueFrom:
configMapKeyRef:
name: inventory-config
key: db_host
- name: REDPANDA_BROKERS
valueFrom:
configMapKeyRef:
name: inventory-config
key: redpanda_brokers
- name: REDIS_ADDR
valueFrom:
configMapKeyRef:
name: inventory-config
key: redis_addr
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
volumeMounts:
- name: config
mountPath: /app/config
readOnly: true
volumes:
- name: config
configMap:
name: stock-service-config
---
apiVersion: v1
kind: Service
metadata:
name: stock-service
spec:
selector:
app: stock-service
ports:
- port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: stock-service
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: stock-service
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
// metrics.go
package main
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Metrics struct {
stockOperations *prometheus.CounterVec
stockLevels *prometheus.GaugeVec
operationDuration *prometheus.HistogramVec
cacheHitRate *prometheus.GaugeVec
eventLatency *prometheus.HistogramVec
}
func NewMetrics(reg prometheus.Registerer) *Metrics {
m := &Metrics{
stockOperations: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "stock_operations_total",
Help: "Total number of stock operations",
},
[]string{"operation", "status"},
),
stockLevels: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "stock_levels",
Help: "Current stock levels by item",
},
[]string{"item_id", "location_id"},
),
operationDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "stock_operation_duration_seconds",
Help: "Duration of stock operations",
Buckets: prometheus.DefBuckets,
},
[]string{"operation"},
),
cacheHitRate: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cache_hit_rate",
Help: "Cache hit rate for stock operations",
},
[]string{"operation"},
),
eventLatency: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "event_processing_duration_seconds",
Help: "Duration of event processing",
Buckets: prometheus.DefBuckets,
},
[]string{"event_type"},
),
}
reg.MustRegister(
m.stockOperations,
m.stockLevels,
m.operationDuration,
m.cacheHitRate,
m.eventLatency,
)
return m
}
// logger.go
package main
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func NewLogger(env string) (*zap.Logger, error) {
var config zap.Config
if env == "production" {
config = zap.NewProductionConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
} else {
config = zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
}
config.OutputPaths = []string{"stdout"}
config.ErrorOutputPaths = []string{"stderr"}
return config.Build(
zap.AddCaller(),
zap.AddStacktrace(zapcore.ErrorLevel),
)
}
// Example usage in service
func (s *StockService) logOperation(ctx context.Context, operation string, fields ...zap.Field) {
defaultFields := []zap.Field{
zap.String("operation", operation),
zap.String("service", "stock-service"),
}
if traceID := trace.FromContext(ctx); traceID != "" {
defaultFields = append(defaultFields, zap.String("traceId", traceID))
}
s.logger.Info("Operation executed",
append(defaultFields, fields...)...)
}
// tracing.go
package main
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/attribute"
)
func initTracer(serviceName string) (func(), error) {
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://jaeger-collector:14268/api/traces"),
))
if err != nil {
return nil, fmt.Errorf("create jaeger exporter: %w", err)
}
tp := trace.NewTracerProvider(
trace.WithBatcher(exporter),
trace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
)),
)
otel.SetTracerProvider(tp)
return func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}, nil
}
// Example usage in service
func (s *StockService) UpdateStock(ctx context.Context, req UpdateStockRequest) error {
ctx, span := s.tracer.Start(ctx, "UpdateStock")
defer span.End()
span.SetAttributes(
attribute.String("item.id", req.ItemID.String()),
attribute.Int("quantity", req.Quantity),
)
// Business logic here
if err := s.doUpdate(ctx, req); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
}
flowchart TB
subgraph "Cache Strategy"
direction TB
A[Request] --> B{Cache Hit?}
B -->|Yes| C[Return Cached]
B -->|No| D[Query DB]
D --> E[Update Cache]
E --> F[Return Fresh]
end
subgraph "Cache Invalidation"
direction TB
G[Write Operation] --> H[Update DB]
H --> I[Publish Event]
I --> J[Invalidate Cache]
end
subgraph "Cache Warming"
direction TB
K[Background Job] --> L[Get Popular Items]
L --> M[Prefetch Data]
M --> N[Update Cache]
end
// cache.go
package main
type CacheManager struct {
redis *redis.Client
metrics *Metrics
logger *zap.Logger
}
func (c *CacheManager) Get(ctx context.Context, key string, value interface{}) error {
start := time.Now()
defer func() {
c.metrics.operationDuration.WithLabelValues("cache_get").Observe(
time.Since(start).Seconds(),
)
}()
data, err := c.redis.Get(ctx, key).Bytes()
if err != nil {
if err == redis.Nil {
c.metrics.cacheHitRate.WithLabelValues("get").Set(0)
return ErrCacheMiss
}
return fmt.Errorf("get from redis: %w", err)
}
if err := json.Unmarshal(data, value); err != nil {
return fmt.Errorf("unmarshal cached value: %w", err)
}
c.metrics.cacheHitRate.WithLabelValues("get").Set(1)
return nil
}
func (c *CacheManager) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
data, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("marshal value: %w", err)
}
if err := c.redis.Set(ctx, key, data, ttl).Err(); err != nil {
return fmt.Errorf("set in redis: %w", err)
}
return nil
}
func (c *CacheManager) Invalidate(ctx context.Context, pattern string) error {
iter := c.redis.Scan(ctx, 0, pattern, 0).Iterator()
var keys []string
for iter.Next(ctx) {
keys = append(keys, iter.Val())
}
if err := iter.Err(); err != nil {
return fmt.Errorf("scan keys: %w", err)
}
if len(keys) > 0 {
if err := c.redis.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("delete keys: %w", err)
}
}
return nil
}
// Cache warming
func (c *CacheManager) WarmCache(ctx context.Context, items []string) error {
for _, itemID := range items {
if err := c.warmItem(ctx, itemID); err != nil {
c.logger.Error("warm cache failed",
zap.String("itemId", itemID),
zap.Error(err))
continue
}
}
return nil
}
func (c *CacheManager) warmItem(ctx context.Context, itemID string) error {
// Implementation for warming up a single item
return nil
}
-- Partitioning for stock_movements
CREATE TABLE stock_movements_partition OF stock_movements
PARTITION BY RANGE (created_at);
-- Create monthly partitions
CREATE TABLE stock_movements_y2025m01 PARTITION OF stock_movements_partition
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
-- Create indexes on partitions
CREATE INDEX idx_movements_y2025m01_item
ON stock_movements_y2025m01 (item_id);
-- Materialized view for common queries
CREATE MATERIALIZED VIEW mv_stock_summary AS
SELECT
i.id as item_id,
i.sku,
i.name,
l.id as location_id,
l.name as location_name,
sl.quantity,
sl.reserved,
sl.quantity - sl.reserved as available,
sl.reorder_point,
sl.safety_stock
FROM
items i
JOIN stock_levels sl ON i.id = sl.item_id
JOIN locations l ON sl.location_id = l.id
WITH DATA;
-- Refresh materialized view
CREATE OR REPLACE FUNCTION refresh_stock_summary()
RETURNS trigger AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_stock_summary;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trigger_refresh_stock_summary
AFTER INSERT OR UPDATE OR DELETE ON stock_levels
FOR EACH STATEMENT
EXECUTE FUNCTION refresh_stock_summary();
// loadtest/main.go
package main
import (
"context"
"time"
"github.com/tsenart/vegeta/lib"
)
func main() {
// Configure test
rate := vegeta.Rate{Freq: 100, Per: time.Second}
duration := 5 * time.Minute
targeter := vegeta.NewStaticTargeter(vegeta.Target{
Method: "GET",
URL: "http://stock-service/api/stock/item-1",
})
// Run test
attacker := vegeta.NewAttacker()
var metrics vegeta.Metrics
for res := range attacker.Attack(targeter, rate, duration, "Stock Service Test") {
metrics.Add(res)
}
metrics.Close()
// Report results
reporter := vegeta.NewTextReporter(&metrics)
reporter.Report(os.Stdout)
}
// circuitbreaker.go
package main
type CircuitBreaker struct {
mu sync.RWMutex
failures int
lastFailure time.Time
state State
threshold int
timeout time.Duration
}
func (cb *CircuitBreaker) Execute(operation func() error) error {
if !cb.canExecute() {
return ErrCircuitOpen
}
err := operation()
cb.updateState(err)
return err
}
func (cb *CircuitBreaker) canExecute() bool {
cb.mu.RLock()
defer cb.mu.RUnlock()
if cb.state == StateClosed {
return true
}
if cb.state == StateOpen &&
time.Since(cb.lastFailure) >= cb.timeout {
return true
}
return false
}
func (cb *CircuitBreaker) updateState(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = StateOpen
}
} else {
cb.failures = 0
cb.state = StateClosed
}
}
// retry.go
package main
func retry(ctx context.Context, operation func() error, opts RetryOptions) error {
var lastErr error
for attempt := 0; attempt < opts.MaxAttempts; attempt++ {
if err := operation(); err != nil {
lastErr = err
if !opts.ShouldRetry(err) {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(opts.BackoffFunc(attempt)):
continue
}
}
return nil
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
type RetryOptions struct {
MaxAttempts int
BackoffFunc func(attempt int) time.Duration
ShouldRetry func(err error) bool
}
func ExponentialBackoff(base time.Duration) func(int) time.Duration {
return func(attempt int) time.Duration {
return base * time.Duration(math.Pow(2, float64(attempt)))
}
}
// dlq.go
package main
type DeadLetterQueue struct {
redpanda *kgo.Client
logger *zap.Logger
}
func (dlq *DeadLetterQueue) HandleFailedMessage(ctx context.Context, msg FailedMessage) error {
record := &kgo.Record{
Topic: "inventory.failures",
Key: []byte(msg.ID),
Value: msg.Data,
Headers: []kgo.RecordHeader{
{Key: "error", Value: []byte(msg.Error.Error())},
{Key: "original_topic", Value: []byte(msg.OriginalTopic)},
{Key: "retry_count", Value: []byte(strconv.Itoa(msg.RetryCount))},
{Key: "failed_at", Value: []byte(time.Now().Format(time.RFC3339))},
},
}
results := dlq.redpanda.ProduceSync(ctx, record)
return results.FirstErr()
}
func (dlq *DeadLetterQueue) ProcessFailures(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := dlq.processFailedMessages(ctx); err != nil {
dlq.logger.Error("process failures",
zap.Error(err))
time.Sleep(time.Second * 5)
}
}
}
}
func (dlq *DeadLetterQueue) processFailedMessages(ctx context.Context) error {
// Implementation for processing failed messages
return nil
}
The Schema Registry provides a centralized schema management system that ensures data consistency and compatibility across our event-driven architecture. We use Apache Avro for schema definition due to its compact serialization and strong schema evolution capabilities.
Our schemas are organized by domain and version:
schemas/
├── stock/
│ ├── commands/
│ │ ├── v1_stock_update.avsc
│ │ └── v1_stock_reserve.avsc
│ ├── events/
│ │ ├── v1_stock_updated.avsc
│ │ └── v1_stock_reserved.avsc
│ └── models/
│ ├── v1_stock_item.avsc
│ └── v1_stock_level.avsc
└── cdc/
└── v1_stock_change.avsc
// schemas/stock/models/v1_stock_item.avsc
{
"type": "record",
"name": "StockItem",
"namespace": "com.inventory.stock",
"doc": "Represents a stock item in the inventory system",
"fields": [
{"name": "id", "type": "string", "doc": "Unique identifier for the item"},
{"name": "sku", "type": "string", "doc": "Stock keeping unit"},
{"name": "name", "type": "string", "doc": "Item name"},
{"name": "description", "type": ["null", "string"], "default": null, "doc": "Item description"},
{"name": "category_id", "type": "string", "doc": "Category identifier"},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"default": {},
"doc": "Additional item metadata"
},
{"name": "created_at", "type": "long", "logicalType": "timestamp-millis", "doc": "Creation timestamp"},
{"name": "updated_at", "type": "long", "logicalType": "timestamp-millis", "doc": "Last update timestamp"}
]
}
// schemas/stock/events/v1_stock_updated.avsc
{
"type": "record",
"name": "StockUpdatedEvent",
"namespace": "com.inventory.stock.events",
"doc": "Event emitted when stock levels are updated",
"fields": [
{"name": "event_id", "type": "string", "doc": "Unique event identifier"},
{"name": "event_type", "type": "string", "doc": "Event type identifier"},
{"name": "event_time", "type": "long", "logicalType": "timestamp-millis", "doc": "Event timestamp"},
{
"name": "item",
"type": "com.inventory.stock.StockItem",
"doc": "The stock item that was updated"
},
{
"name": "stock_level",
"type": {
"type": "record",
"name": "StockLevel",
"fields": [
{"name": "quantity", "type": "int", "doc": "Current quantity"},
{"name": "reserved", "type": "int", "doc": "Reserved quantity"},
{"name": "available", "type": "int", "doc": "Available quantity (quantity - reserved)"},
{"name": "location_id", "type": "string", "doc": "Location identifier"}
]
}
},
{
"name": "change",
"type": {
"type": "record",
"name": "StockChange",
"fields": [
{"name": "previous_quantity", "type": "int"},
{"name": "new_quantity", "type": "int"},
{"name": "change_type", "type": "string"},
{"name": "reason", "type": ["null", "string"], "default": null}
]
}
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"default": {},
"doc": "Additional event metadata"
}
]
}
We follow these schema evolution rules to maintain compatibility:
Backward Compatibility
Forward Compatibility
Version Management
// schema/registry.go
package schema
import (
"github.com/linkedin/goavro/v2"
"github.com/redpanda-data/console/backend/pkg/schema"
)
type SchemaRegistry struct {
client *schema.Client
codecs map[string]*goavro.Codec
logger *zap.Logger
}
func NewSchemaRegistry(config SchemaRegistryConfig) (*SchemaRegistry, error) {
client, err := schema.NewClient(schema.Config{
URLs: config.URLs,
Username: config.Username,
Password: config.Password,
})
if err != nil {
return nil, fmt.Errorf("create schema registry client: %w", err)
}
return &SchemaRegistry{
client: client,
codecs: make(map[string]*goavro.Codec),
logger: config.Logger,
}, nil
}
func (r *SchemaRegistry) GetSchema(subject string, version int) (*goavro.Codec, error) {
// Check cache first
cacheKey := fmt.Sprintf("%s-v%d", subject, version)
if codec, ok := r.codecs[cacheKey]; ok {
return codec, nil
}
// Fetch from registry
schema, err := r.client.GetSchemaByVersion(subject, version)
if err != nil {
return nil, fmt.Errorf("get schema: %w", err)
}
// Create codec
codec, err := goavro.NewCodec(schema.Schema)
if err != nil {
return nil, fmt.Errorf("create codec: %w", err)
}
// Cache codec
r.codecs[cacheKey] = codec
return codec, nil
}
func (r *SchemaRegistry) RegisterSchema(subject, schema string) (int, error) {
// Validate schema first
if _, err := goavro.NewCodec(schema); err != nil {
return 0, fmt.Errorf("invalid schema: %w", err)
}
// Register with compatibility check
id, err := r.client.RegisterSchema(subject, schema, schema.AVRO)
if err != nil {
return 0, fmt.Errorf("register schema: %w", err)
}
return id, nil
}
// events/serialization.go
package events
import (
"encoding/binary"
"github.com/linkedin/goavro/v2"
)
const (
schemaIDLength = 4
magicByte = byte(0)
)
type Serializer struct {
registry *schema.SchemaRegistry
subject string
version int
codec *goavro.Codec
}
func NewSerializer(registry *schema.SchemaRegistry, subject string, version int) (*Serializer, error) {
codec, err := registry.GetSchema(subject, version)
if err != nil {
return nil, fmt.Errorf("get schema: %w", err)
}
return &Serializer{
registry: registry,
subject: subject,
version: version,
codec: codec,
}, nil
}
func (s *Serializer) Serialize(event interface{}) ([]byte, error) {
// Convert event to native Go map
native, err := event.ToNative()
if err != nil {
return nil, fmt.Errorf("convert to native: %w", err)
}
// Serialize data
data, err := s.codec.BinaryFromNative(nil, native)
if err != nil {
return nil, fmt.Errorf("serialize: %w", err)
}
// Add schema ID
schemaID := make([]byte, schemaIDLength)
binary.BigEndian.PutUint32(schemaID, uint32(s.version))
// Combine magic byte, schema ID, and data
result := make([]byte, 1+schemaIDLength+len(data))
result[0] = magicByte
copy(result[1:], schemaID)
copy(result[1+schemaIDLength:], data)
return result, nil
}
func (s *Serializer) Deserialize(data []byte) (interface{}, error) {
if len(data) < 1+schemaIDLength {
return nil, fmt.Errorf("data too short")
}
// Verify magic byte
if data[0] != magicByte {
return nil, fmt.Errorf("invalid magic byte")
}
// Extract schema ID
schemaID := binary.BigEndian.Uint32(data[1:1+schemaIDLength])
// Get codec for schema ID
codec, err := s.registry.GetSchema(s.subject, int(schemaID))
if err != nil {
return nil, fmt.Errorf("get schema: %w", err)
}
// Deserialize data
native, _, err := codec.NativeFromBinary(data[1+schemaIDLength:])
if err != nil {
return nil, fmt.Errorf("deserialize: %w", err)
}
return native, nil
}
// Example of publishing an event with schema validation
func (p *StockEventPublisher) PublishStockUpdatedEvent(ctx context.Context, event StockUpdatedEvent) error {
// Get serializer for stock updated events
serializer, err := NewSerializer(p.registry, "inventory.stock.events.StockUpdated", 1)
if err != nil {
return fmt.Errorf("create serializer: %w", err)
}
// Serialize event
data, err := serializer.Serialize(event)
if err != nil {
return fmt.Errorf("serialize event: %w", err)
}
// Create record
record := &kgo.Record{
Topic: "inventory.stock.events",
Key: []byte(event.Item.ID),
Value: data,
Headers: []kgo.RecordHeader{
{Key: "event_type", Value: []byte(event.EventType)},
{Key: "schema_version", Value: []byte("1")},
},
}
// Publish event
results := p.client.ProduceSync(ctx, record)
if err := results.FirstErr(); err != nil {
return fmt.Errorf("produce event: %w", err)
}
return nil
}
// Example of consuming an event with schema validation
func (s *StockEventSubscriber) handleRecord(ctx context.Context, record *kgo.Record) error {
// Get deserializer for stock events
deserializer, err := NewSerializer(s.registry, "inventory.stock.events.StockUpdated", 1)
if err != nil {
return fmt.Errorf("create deserializer: %w", err)
}
// Deserialize event
native, err := deserializer.Deserialize(record.Value)
if err != nil {
return fmt.Errorf("deserialize event: %w", err)
}
// Convert to concrete type
event, err := StockUpdatedEventFromNative(native)
if err != nil {
return fmt.Errorf("convert event: %w", err)
}
// Process event
return s.handler.HandleStockEvent(ctx, event)
}
Here’s an example of evolving the StockItem schema while maintaining compatibility:
// schemas/stock/models/v2_stock_item.avsc
{
"type": "record",
"name": "StockItem",
"namespace": "com.inventory.stock",
"doc": "Represents a stock item in the inventory system",
"fields": [
{ "name": "id", "type": "string", "doc": "Unique identifier for the item" },
{ "name": "sku", "type": "string", "doc": "Stock keeping unit" },
{ "name": "name", "type": "string", "doc": "Item name" },
{
"name": "description",
"type": ["null", "string"],
"default": null,
"doc": "Item description"
},
{ "name": "category_id", "type": "string", "doc": "Category identifier" },
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"default": {},
"doc": "Additional item metadata"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "Creation timestamp"
},
{
"name": "updated_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "Last update timestamp"
},
// New fields added in v2
{
"name": "tags",
"type": {
"type": "array",
"items": "string"
},
"default": [],
"doc": "Item tags for categorization"
},
{
"name": "dimensions",
"type": [
"null",
{
"type": "record",
"name": "Dimensions",
"fields": [
{ "name": "length", "type": "double" },
{ "name": "width", "type": "double" },
{ "name": "height", "type": "double" },
{ "name": "unit", "type": "string" }
]
}
],
"default": null,
"doc": "Item dimensions"
}
]
}
This schema evolution:
Building a modern inventory management system requires careful consideration of architecture, scalability, and reliability. Throughout this guide, we’ve explored how to build a robust, event-driven inventory system that can handle the demands of modern commerce.
Event-Driven Architecture: By leveraging Redpanda as our event streaming platform, we’ve created a system where services can communicate asynchronously, enabling real-time processing and system decoupling. This approach allows our inventory system to scale independently and maintain high throughput even during peak loads.
CQRS Pattern: Separating our read and write concerns through the Command Query Responsibility Segregation pattern has allowed us to optimize each path independently. Write operations maintain consistency through transactional boundaries, while read operations can be highly optimized with caching and specialized data models.
Schema Evolution: Using Schema Registry with Redpanda ensures that our events maintain compatibility as our system evolves. This provides a robust foundation for long-term maintainability and prevents breaking changes from disrupting operations.
Change Data Capture: Implementing CDC with Debezium allows us to capture database changes as events, creating a reliable audit trail and enabling real-time analytics without impacting our primary database performance.
Resilience Patterns: Through circuit breakers, retry mechanisms, and dead letter queues, our system can gracefully handle failures and recover automatically from transient issues, ensuring high availability and data integrity.
Observability: Comprehensive monitoring with Prometheus metrics, structured logging, and distributed tracing gives us visibility into system behavior, making it easier to diagnose issues and optimize performance.
The modern event-driven inventory management system delivers significant business value across multiple dimensions:
graph LR
subgraph "Customer Experience Impact"
direction TB
CE1[Real-time Visibility]
CE2[Accurate Delivery Promises]
CE3[No Overselling]
CE4[Omnichannel Experience]
CE5[Personalized Service]
CE1 & CE2 & CE3 & CE4 & CE5 --> CS[Enhanced Customer Satisfaction]
end
subgraph "Operational Excellence"
direction TB
OE1[Automated Workflows]
OE2[Real-time Analytics]
OE3[Reduced Manual Work]
OE4[Predictive Insights]
OE5[Process Optimization]
OE1 & OE2 & OE3 & OE4 & OE5 --> OI[Operational Improvement]
end
subgraph "Financial Benefits"
direction TB
FB1[Reduced Inventory Costs]
FB2[Better Cash Flow]
FB3[Lower Operating Costs]
FB4[Increased Sales]
FB5[Higher Margins]
FB1 & FB2 & FB3 & FB4 & FB5 --> FI[Financial Impact]
end
subgraph "Strategic Advantages"
direction TB
SA1[Market Agility]
SA2[Competitive Edge]
SA3[Innovation Enablement]
SA4[Scalable Growth]
SA5[Global Expansion]
SA1 & SA2 & SA3 & SA4 & SA5 --> SI[Strategic Impact]
end
CS --> BV[Total Business Value]
OI --> BV
FI --> BV
SI --> BV
style BV fill:#f9f,stroke:#333,stroke-width:4px
style CS fill:#bbf,stroke:#333,stroke-width:2px
style OI fill:#bbf,stroke:#333,stroke-width:2px
style FI fill:#bbf,stroke:#333,stroke-width:2px
style SI fill:#bbf,stroke:#333,stroke-width:2px
classDef impact fill:#e1e1e1,stroke:#333,stroke-width:1px
class CE1,CE2,CE3,CE4,CE5,OE1,OE2,OE3,OE4,OE5,FB1,FB2,FB3,FB4,FB5,SA1,SA2,SA3,SA4,SA5 impact
mindmap
root((Future
Evolution))
AI/ML Integration
Demand Forecasting
Historical Analysis
Seasonal Patterns
Market Trends
Anomaly Detection
Fraud Prevention
Quality Control
Pattern Recognition
Automated Reordering
Safety Stock
Lead Time
Optimal Quantity
Edge Computing
In-Store Processing
Local Inventory
POS Integration
Real-time Updates
Warehouse Operations
Pick/Pack Optimization
Space Utilization
Equipment Tracking
Offline Capabilities
Local Cache
Sync Mechanism
Conflict Resolution
Blockchain Integration
Supply Chain
Product Tracking
Origin Verification
Chain of Custody
Smart Contracts
Automated Payments
SLA Enforcement
Compliance
Sustainability
Carbon Footprint
Ethical Sourcing
Waste Reduction
IoT Enhancement
RFID Systems
Asset Tracking
Automated Counting
Loss Prevention
Sensor Networks
Environmental
Movement Detection
Usage Patterns
Automated Systems
Robotic Storage
AGV Integration
Smart Shelving
Advanced Analytics
Predictive Models
Stock Optimization
Resource Planning
Risk Assessment
Real-time Dashboards
KPI Monitoring
Alert System
Decision Support
Business Intelligence
Custom Reports
Data Mining
Trend Analysis
graph TB
subgraph "Current State"
C1[Basic Automation]
C2[Manual Decisions]
C3[Reactive Approach]
C4[Limited Integration]
end
subgraph "Near Future"
N1[Smart Automation]
N2[AI-Assisted Decisions]
N3[Predictive Systems]
N4[IoT Integration]
end
subgraph "Future Vision"
F1[Autonomous Operations]
F2[AI-Driven Strategy]
F3[Prescriptive Analytics]
F4[Full Integration]
end
C1 --> N1
C2 --> N2
C3 --> N3
C4 --> N4
N1 --> F1
N2 --> F2
N3 --> F3
N4 --> F4
style C1 fill:#f9f,stroke:#333,stroke-width:2px
style C2 fill:#f9f,stroke:#333,stroke-width:2px
style C3 fill:#f9f,stroke:#333,stroke-width:2px
style C4 fill:#f9f,stroke:#333,stroke-width:2px
style N1 fill:#bbf,stroke:#333,stroke-width:2px
style N2 fill:#bbf,stroke:#333,stroke-width:2px
style N3 fill:#bbf,stroke:#333,stroke-width:2px
style N4 fill:#bbf,stroke:#333,stroke-width:2px
style F1 fill:#bfb,stroke:#333,stroke-width:2px
style F2 fill:#bfb,stroke:#333,stroke-width:2px
style F3 fill:#bfb,stroke:#333,stroke-width:2px
style F4 fill:#bfb,stroke:#333,stroke-width:2px
By building on the solid foundation of event-driven architecture and cloud-native principles we’ve established, these enhancements can be incrementally added to create an increasingly sophisticated inventory management system that adapts to changing business needs.
The modern inventory management system we’ve built is not just about tracking stock levels—it’s about creating a digital nervous system that can sense, respond, and adapt to the complex demands of today’s supply chains. With this architecture, businesses can achieve the real-time visibility, operational efficiency, and scalability needed to thrive in an increasingly competitive marketplace.