In today’s fast-paced digital commerce landscape, traditional inventory management approaches are breaking under the strain of omnichannel retail, rapid fulfillment demands, and complex supply chains. This comprehensive guide explores building a modern, event-driven inventory management system (IMS) that can scale to meet these challenges while maintaining data consistency and real-time capabilities.
timeline title Evolution of Inventory Systems 1960(s) : Manual Systems : Paper Ledgers : Physical counting : Basic record keeping : Weekly reconciliation 1980(s) : Early Digital : Mainframe Systems : Basic databases : Batch processing : Monthly reporting 1990(s) : Client-Server : Distributed Systems : Local databases : Daily updates : Barcode scanning 2000(s) : Enterprise Systems : ERP Integration : Centralized databases : Real-time updates : RFID tracking 2010(s) : Cloud Solutions : SaaS & PaaS : Mobile integration : API-first approach : Multi-channel : ML predictions 2020(s) : Event-Driven : Microservices : Real-time analytics : Omnichannel : CQRS patterns : Edge computing : AI-driven
Modern inventory management faces several critical challenges that require sophisticated solutions. These challenges have evolved with the increasing complexity of global supply chains, omnichannel retail, and customer expectations for real-time accuracy.
In today’s fast-paced retail environment, maintaining accurate inventory data is more challenging and critical than ever. Businesses must track inventory across multiple channels, locations, and systems while ensuring data consistency and real-time updates.
mindmap root((Data Accuracy)) Real-time Updates Multi-channel Sales Returns Processing Stock Adjustments Warehouse Movements Multi-source Sync Store Systems E-commerce Platform Warehouse Management Supplier Integration Error Prevention Validation Rules Automated Checks Reconciliation Audit Trails
Key Implications:
Solution Requirements:
As businesses grow, their inventory management systems must handle increasing volumes of data and transactions while maintaining performance and reliability.
mindmap root((Scale Challenges)) High Volume Millions of SKUs Multiple Locations Concurrent Users Peak Processing Geographic Distribution Multiple Regions Data Replication Latency Management Legal Compliance System Performance Response Time Processing Speed Resource Usage Cost Efficiency
Key Implications:
Solution Requirements:
Modern inventory systems must integrate with a diverse ecosystem of applications, partners, and services, each with its own protocols, data formats, and requirements.
mindmap root((Integration Complexity)) Legacy Systems ERP Integration Custom Protocols Data Migration System Constraints Partner Networks Supplier Systems Logistics Partners Payment Providers Analytics Tools New Channels Mobile Apps Social Commerce Marketplaces Voice Commerce
Key Implications:
Solution Requirements:
Modern inventory management must accommodate complex business rules, workflows, and requirements while maintaining flexibility and adaptability.
mindmap root((Business Complexity)) Product Management Multiple SKUs Variants Handling Bundle Products Digital Products Location Management Warehouses Retail Stores Dark Stores Partner Locations Inventory Rules Safety Stock Reorder Points Allocation Rules Reservation Logic
Key Implications:
Solution Requirements:
Inventory management systems must meet stringent security requirements and comply with various regulations while protecting sensitive business data.
mindmap root((Compliance & Security)) Regulations Data Protection Financial Rules Industry Standards Local Laws Auditing Transaction Logs Change History Access Records System Events Security Access Control Data Encryption Threat Detection Incident Response
Key Implications:
Solution Requirements:
These challenges are interconnected and their impact extends across the entire organization:
Operational Impact
Business Impact
Technical Impact
The solution to these challenges requires a modern, event-driven architecture that can:
classDiagram class InventorySystem { +addStock(item: Item, quantity: number) +removeStock(item: Item, quantity: number) +adjustStock(item: Item, adjustment: number) +moveStock(item: Item, from: Location, to: Location) +reserveStock(item: Item, quantity: number) +confirmReservation(reservation: Reservation) +cancelReservation(reservation: Reservation) +getStockLevel(item: Item) +getStockHistory(item: Item) +getForecast(item: Item) } class StockLevel { +number available +number reserved +number inTransit +number backorder +number reorderPoint +number safetyStock } class Item { +string id +string sku +string name +string description +Category category } class Location { +string id +string name +string type +Address address } class Reservation { +string id +Item item +number quantity +string status +Date expiresAt } InventorySystem ..> StockLevel : returns InventorySystem ..> Reservation : manages InventorySystem --> Item : manages InventorySystem --> Location : manages
graph TB subgraph "Performance Requirements" P1[Response Time] --> P2[< 100ms for 99th percentile] P1 --> P3[< 1s for batch operations] P4[Throughput] --> P5[10K transactions/second] P4 --> P6[1M events/day] end subgraph "Scalability Requirements" S1[Horizontal Scaling] --> S2[Auto-scaling support] S1 --> S3[Multi-region deployment] S4[Data Growth] --> S5[1TB/month] S4 --> S6[3 years retention] end subgraph "Reliability Requirements" R1[Availability] --> R2[99.99% uptime] R1 --> R3[24/7 operation] R4[Fault Tolerance] --> R5[No single point of failure] R4 --> R6[Automatic failover] end subgraph "Security Requirements" SE1[Authentication] --> SE2[OAuth 2.0/OIDC] SE1 --> SE3[Role-based access] SE4[Data Protection] --> SE5[Encryption at rest] SE4 --> SE6[TLS in transit] end
Response Time
Throughput
Resource Utilization
Horizontal Scaling
Data Growth
Load Handling
Availability
Fault Tolerance
Data Consistency
Authentication & Authorization
Data Protection
Compliance
Monitoring
Deployment
Documentation
The component architecture integrates several key aspects including service mesh, security, and observability:
Service Mesh Integration
Security Controls
Observability Stack
graph TB subgraph "Frontend Layer" UI[Web UI] Mobile[Mobile Apps] B2B[B2B Portal] end subgraph "API Gateway Layer" Gateway[API Gateway] Auth[Authentication] RateLimit[Rate Limiter] end subgraph "Service Mesh Layer" IC[Istio Control] Proxy1[Envoy Proxy 1] Proxy2[Envoy Proxy 2] Proxy3[Envoy Proxy 3] end subgraph "Command Services" Stock[Stock Service] Location[Location Service] Reserve[Reservation Service] Order[Order Service] end subgraph "Event Processing" RP[Redpanda Cluster] SR[Schema Registry] DLQ[Dead Letter Queue] CDC[Change Data Capture] end subgraph "Query Services" Analytics[Analytics Service] Report[Reporting Service] Search[Search Service] Cache[Redis Cache] end subgraph "Storage Layer" PG[(PostgreSQL)] ES[(Elasticsearch)] Metrics[(Prometheus)] end UI --> Gateway Mobile --> Gateway B2B --> Gateway Gateway --> Auth Gateway --> RateLimit Auth --> IC IC --> Proxy1 IC --> Proxy2 IC --> Proxy3 Proxy1 --> Stock Proxy2 --> Location Proxy3 --> Reserve Stock --> RP Location --> RP Reserve --> RP Order --> RP RP --> Analytics RP --> Report RP --> Search RP --> DLQ Stock --> PG Location --> PG Reserve --> PG Order --> PG PG --> CDC CDC --> RP Analytics --> Cache Report --> ES Search --> ES Stock --> Metrics Location --> Metrics Reserve --> Metrics Order --> Metrics
sequenceDiagram participant C as Client participant G as API Gateway participant M as Service Mesh participant S as Stock Service participant R as Redpanda participant P as PostgreSQL participant Q as Query Service participant Ca as Cache participant O as Observability C->>G: Request stock update G->>M: Route request M->>S: Forward with mTLS rect rgb(240, 240, 240) Note over S,P: Transaction Boundary S->>P: Begin transaction S->>P: Update stock S->>R: Publish event S->>P: Commit transaction end R-->>Q: Consume event Q->>Ca: Update cache Q-->>C: Send update notification M->>O: Send telemetry S->>O: Send metrics P->>O: Send database metrics
graph TB subgraph "Kubernetes Cluster" subgraph "Ingress Layer" Ingress[Ingress Controller] Cert[Cert Manager] end subgraph "Application Layer" API[API Gateway] Stock[Stock Service] Query[Query Service] end subgraph "Data Layer" RP[Redpanda StatefulSet] PG[PostgreSQL StatefulSet] Redis[Redis StatefulSet] end subgraph "Monitoring" Prom[Prometheus] Graf[Grafana] Alert[AlertManager] end end subgraph "External Services" DNS[DNS] CDN[CDN] Backup[Backup Storage] end DNS --> Ingress CDN --> Ingress Ingress --> API API --> Stock API --> Query Stock --> RP Stock --> PG Query --> Redis PG --> Backup RP --> Backup
For this implementation, we’ll use Minikube as our local Kubernetes cluster. Minikube provides a lightweight way to run Kubernetes locally for development and testing.
# Start Minikube with sufficient resources for our stack
minikube start --cpus 4 --memory 8192 --disk-size 20g
# Enable necessary addons
minikube addons enable metrics-server
minikube addons enable ingress
# Verify the cluster is running
kubectl cluster-info
Expected output:
Kubernetes control plane is running at https://192.168.49.2:8443
CoreDNS is running at https://192.168.49.2:8443/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
For development purposes, we’ll configure Minikube to use the local Docker daemon:
# Configure shell to use Minikube's Docker daemon
eval $(minikube docker-env)
First, let’s create a dedicated namespace for Redpanda:
kubectl create namespace redpanda
Add the Redpanda Helm repository and update it:
helm repo add redpanda https://charts.redpanda.com
helm repo update
Create a file named redpanda-values.yaml
with a minimal configuration for local development:
# redpanda-values.yaml
statefulset:
initContainers:
setDataDirOwnership:
enabled: true
replicas: 1
resources:
cpu:
cores: 1
memory:
container:
max: 1.5Gi
redpanda:
reserveMemory: 1Gi
storage:
persistentVolume:
enabled: true
size: 4Gi
storageClass: "standard" # Use Minikube's default storage class
auth:
sasl:
enabled: false
tls:
enabled: false
console:
enabled: true
auth:
enabled: false
Install Redpanda using Helm:
helm install redpanda redpanda/redpanda \
--namespace redpanda \
--values redpanda-values.yaml
Verify the installation:
# Check if pods are running
kubectl get pods -n redpanda
# Expected output:
NAME READY STATUS RESTARTS AGE
redpanda-0 1/1 Running 0 2m
redpanda-console-7b9c8b6f7-x9j2v 1/1 Running 0 2m
Set up port forwarding for local access:
# For Kafka API
kubectl port-forward -n redpanda redpanda-0 9092:9092 &
# For Schema Registry
kubectl port-forward -n redpanda redpanda-0 8081:8081 &
# For Redpanda Console
kubectl port-forward -n redpanda svc/redpanda-console 8080:8080 &
Test connectivity using rpk:
# Create a test topic
rpk topic create test-topic \
--brokers localhost:9092
# List topics
rpk topic list \
--brokers localhost:9092
# Expected output:
TOPIC PARTITIONS REPLICAS
test-topic 1 1
For production deployments, you should use a more comprehensive configuration:
# redpanda-prod-values.yaml
statefulset:
initContainers:
setDataDirOwnership:
enabled: true
replicas: 3
resources:
cpu:
cores: 2
memory:
container:
max: 8Gi
redpanda:
reserveMemory: 6Gi
storage:
persistentVolume:
enabled: true
size: 100Gi
storageClass: "ceph-filesystem" # Use appropriate storage class for your cloud provider
auth:
sasl:
enabled: true
users:
- name: admin
password: ${REDPANDA_ADMIN_PASSWORD}
tls:
enabled: true
monitoring:
enabled: true
serviceMonitor:
enabled: true
console:
enabled: true
auth:
enabled: true
# postgres-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: postgres-config
namespace: database
data:
postgresql.conf: |
listen_addresses = '*'
max_connections = 100
shared_buffers = 256MB
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10
wal_sender_timeout = 60s
max_worker_processes = 10
track_commit_timestamp = on
pg_hba.conf: |
local all all trust
host all all 127.0.0.1/32 trust
host all all ::1/128 trust
host all all 0.0.0.0/0 md5
host replication all 0.0.0.0/0 md5
---
# postgres-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgres
namespace: database
spec:
serviceName: postgres
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15
ports:
- containerPort: 5432
name: postgres
env:
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: postgres-secrets
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: postgres-secrets
key: password
- name: POSTGRES_DB
value: inventory
- name: PGDATA
value: /var/lib/postgresql/data/pgdata
volumeMounts:
- name: postgres-data
mountPath: /var/lib/postgresql/data
- name: postgres-config
mountPath: /etc/postgresql/postgresql.conf
subPath: postgresql.conf
- name: postgres-config
mountPath: /etc/postgresql/pg_hba.conf
subPath: pg_hba.conf
resources:
requests:
memory: 2Gi
cpu: 1000m
limits:
memory: 4Gi
cpu: 2000m
volumeClaimTemplates:
- metadata:
name: postgres-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
# debezium-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: debezium-config
namespace: debezium
data:
application.properties: |
bootstrap.servers=redpanda.redpanda.svc.cluster.local:9092
group.id=inventory-connect-cluster
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://redpanda.redpanda.svc.cluster.local:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://redpanda.redpanda.svc.cluster.local:8081
rest.port=8083
rest.advertised.host.name=debezium
plugin.path=/kafka/connect
---
# debezium-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: debezium
namespace: debezium
spec:
replicas: 2
selector:
matchLabels:
app: debezium
template:
metadata:
labels:
app: debezium
spec:
containers:
- name: debezium
image: debezium/connect:2.3
ports:
- containerPort: 8083
env:
- name: BOOTSTRAP_SERVERS
value: redpanda.redpanda.svc.cluster.local:9092
- name: GROUP_ID
value: inventory-connect-cluster
- name: CONFIG_STORAGE_TOPIC
value: connect-configs
- name: OFFSET_STORAGE_TOPIC
value: connect-offsets
- name: STATUS_STORAGE_TOPIC
value: connect-status
volumeMounts:
- name: debezium-config
mountPath: /kafka/config/connect-distributed.properties
subPath: application.properties
resources:
requests:
memory: 1Gi
cpu: 500m
limits:
memory: 2Gi
cpu: 1000m
volumes:
- name: debezium-config
configMap:
name: debezium-config
The service mesh provides a dedicated infrastructure layer for handling service-to-service communication in our distributed inventory system. We use Istio as our service mesh implementation to manage, secure, and observe microservices interactions.
graph TB subgraph "Control Plane" IC[Istio Control] style IC fill:#f9f,stroke:#333 subgraph "Management" Config[Configuration API] Discovery[Service Discovery] Security[Security Management] end end subgraph "Data Plane" subgraph "Service Pod A" App1[Application] Proxy1[Envoy Proxy] end subgraph "Service Pod B" App2[Application] Proxy2[Envoy Proxy] end end IC --> Proxy1 IC --> Proxy2 Proxy1 <--> Proxy2 App1 --> Proxy1 App2 --> Proxy2
Traffic Management
Security
Observability
Our security architecture implements defense in depth, ensuring protection at every layer of the system:
graph TB subgraph "External Security" WAF[Web Application Firewall] DDOS[DDoS Protection] API[API Gateway] end subgraph "Identity & Access" Auth[Authentication] RBAC[Role-Based Access] OAuth[OAuth2/OIDC] end subgraph "Data Security" Encrypt[Encryption] Mask[Data Masking] Keys[Key Management] end subgraph "Network Security" MTLS[Mutual TLS] SEG[Network Segmentation] FW[Network Policies] end WAF --> API API --> Auth Auth --> OAuth OAuth --> RBAC RBAC --> Encrypt Encrypt --> MTLS
OAuth 2.0/OIDC Implementation
Data Protection
Network Security
Compliance & Audit
Our observability implementation provides comprehensive visibility across all system components:
graph TB subgraph "Data Collection" Metrics[Prometheus Metrics] Logs[Loki Logs] Traces[Tempo Traces] end subgraph "Processing" Rules[Alert Rules] Aggregation[Log Aggregation] Analysis[Trace Analysis] end subgraph "Visualization" Grafana[Grafana Dashboards] Alerts[Alert Manager] Reports[Custom Reports] end Metrics --> Rules Logs --> Aggregation Traces --> Analysis Rules --> Grafana Aggregation --> Grafana Analysis --> Grafana Rules --> Alerts
Infrastructure Monitoring
Application Monitoring
Business Monitoring
Alert Management
# alertmanager-config.yaml
route:
receiver: "team-inventory"
group_by: ["alertname", "cluster", "service"]
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
- match:
severity: critical
receiver: "team-inventory-critical"
repeat_interval: 1h
receivers:
- name: "team-inventory"
slack_configs:
- channel: "#inventory-alerts"
send_resolved: true
- name: "team-inventory-critical"
pagerduty_configs:
- service_key: "<key>"
send_resolved: true
graph TB subgraph "Stock Service" API[HTTP API Layer] Logic[Business Logic] Events[Event Handler] Cache[Cache Layer] DB[Database Layer] end subgraph "External Systems" RP[Redpanda] PG[PostgreSQL] Redis[Redis Cache] Prom[Prometheus] end API --> Logic Logic --> Events Logic --> Cache Logic --> DB Events --> RP DB --> PG Cache --> Redis Logic --> Prom
// config.go
package main
type Config struct {
DBConfig PostgresConfig
RedpandaConfig RedpandaConfig
CacheConfig CacheConfig
MetricsConfig MetricsConfig
ServerConfig ServerConfig
}
type PostgresConfig struct {
Host string
Port int
Database string
User string
Password string
MaxConns int
MinConns int
MaxIdleTime string
}
type RedpandaConfig struct {
Brokers []string
SchemaRegistry string
ConsumerGroup string
CommandTopic string
EventTopic string
FailureTopic string
}
type CacheConfig struct {
Address string
Password string
DB int
TTL string
}
type MetricsConfig struct {
Enabled bool
Port int
Path string
}
type ServerConfig struct {
Port int
ReadTimeout string
WriteTimeout string
ShutdownTimeout string
}
// models.go
package main
import (
"time"
"github.com/google/uuid"
)
type StockItem struct {
ID uuid.UUID `json:"id"`
SKU string `json:"sku"`
Name string `json:"name"`
Description string `json:"description"`
CategoryID uuid.UUID `json:"categoryId"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type StockLevel struct {
ID uuid.UUID `json:"id"`
ItemID uuid.UUID `json:"itemId"`
LocationID uuid.UUID `json:"locationId"`
Quantity int `json:"quantity"`
Reserved int `json:"reserved"`
ReorderPoint int `json:"reorderPoint"`
SafetyStock int `json:"safetyStock"`
Version int `json:"version"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type StockMovement struct {
ID uuid.UUID `json:"id"`
ItemID uuid.UUID `json:"itemId"`
FromLocationID *uuid.UUID `json:"fromLocationId"`
ToLocationID *uuid.UUID `json:"toLocationId"`
Quantity int `json:"quantity"`
Type string `json:"type"`
Status string `json:"status"`
ReferenceID *uuid.UUID `json:"referenceId"`
CreatedAt time.Time `json:"createdAt"`
CompletedAt *time.Time `json:"completedAt"`
}
sequenceDiagram participant C as Client participant H as HTTP Handler participant S as Service participant E as Event Handler participant R as Redpanda participant D as Database C->>H: POST /api/stock H->>S: Process Command S->>D: Begin Transaction S->>D: Update Stock S->>E: Create Event E->>R: Publish Event S->>D: Commit Transaction H-->>C: Response R-->>E: Consume Event E->>S: Process Event S->>D: Update State
// redpanda.go
package main
type RedpandaConfig struct {
Brokers []string
SchemaRegistry string
ClientID string
GroupID string
Topics Topics
}
type Topics struct {
StockCommands string
StockEvents string
StockFailures string
}
func NewRedpandaClient(cfg RedpandaConfig) (*kgo.Client, error) {
client, err := kgo.NewClient(
kgo.SeedBrokers(cfg.Brokers...),
kgo.ClientID(cfg.ClientID),
kgo.ConsumerGroup(cfg.GroupID),
kgo.ConsumeTopics(cfg.Topics.StockCommands),
kgo.DisableAutoCommit(),
kgo.RequiredAcks(kgo.AllISRAcks()),
kgo.RetryBackoffFn(func(attempt int) time.Duration {
return time.Millisecond * time.Duration(math.Min(float64(attempt*200), 1000))
}),
)
if err != nil {
return nil, fmt.Errorf("create redpanda client: %w", err)
}
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx); err != nil {
return nil, fmt.Errorf("ping redpanda: %w", err)
}
return client, nil
}
// Example configuration
var config = RedpandaConfig{
Brokers: []string{"localhost:9092"},
SchemaRegistry: "http://localhost:8081",
ClientID: "stock-service",
GroupID: "stock-service-group",
Topics: Topics{
StockCommands: "inventory.stock.commands",
StockEvents: "inventory.stock.events",
StockFailures: "inventory.stock.failures",
},
}
// publisher.go
package main
type StockEventPublisher struct {
client *kgo.Client
schema *sr.Client
logger *zap.Logger
metrics *Metrics
}
type StockEvent struct {
ID string `json:"id"`
Type string `json:"type"`
ItemID string `json:"itemId"`
Quantity int `json:"quantity"`
Location string `json:"location"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]string `json:"metadata,omitempty"`
}
func (p *StockEventPublisher) PublishStockEvent(ctx context.Context, event StockEvent) error {
start := time.Now()
defer func() {
p.metrics.eventLatency.WithLabelValues("publish").Observe(
time.Since(start).Seconds(),
)
}()
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
record := &kgo.Record{
Topic: "inventory.stock.events",
Key: []byte(event.ItemID),
Value: data,
Headers: []kgo.RecordHeader{
{Key: "event_type", Value: []byte(event.Type)},
{Key: "timestamp", Value: []byte(event.Timestamp.Format(time.RFC3339))},
},
}
results := p.client.ProduceSync(ctx, record)
if err := results.FirstErr(); err != nil {
p.logger.Error("failed to publish event",
zap.Error(err),
zap.String("event_id", event.ID),
zap.String("event_type", event.Type))
return fmt.Errorf("produce event: %w", err)
}
p.logger.Info("event published successfully",
zap.String("event_id", event.ID),
zap.String("event_type", event.Type),
zap.String("item_id", event.ItemID),
zap.Int("quantity", event.Quantity))
return nil
}
// Example usage:
func ExamplePublishEvent() {
event := StockEvent{
ID: uuid.New().String(),
Type: "STOCK_UPDATED",
ItemID: "item-123",
Quantity: 100,
Location: "warehouse-1",
Timestamp: time.Now(),
Metadata: map[string]string{
"user": "system",
"reason": "restock",
},
}
if err := publisher.PublishStockEvent(context.Background(), event); err != nil {
log.Printf("Failed to publish event: %v", err)
return
}
// Expected log output:
// {"level":"info","ts":"2025-03-17T11:23:45.678Z","caller":"publisher/publisher.go:89",
// "msg":"event published successfully","event_id":"550e8400-e29b-41d4-a716-446655440000",
// "event_type":"STOCK_UPDATED","item_id":"item-123","quantity":100}
}
// subscriber.go
package main
type StockEventSubscriber struct {
client *kgo.Client
handler StockEventHandler
logger *zap.Logger
metrics *Metrics
shutdownCh chan struct{}
}
type StockEventHandler interface {
HandleStockEvent(context.Context, StockEvent) error
}
func (s *StockEventSubscriber) Start(ctx context.Context) error {
s.logger.Info("starting stock event subscriber",
zap.Strings("topics", []string{"inventory.stock.events"}))
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-s.shutdownCh:
return nil
default:
fetches := s.client.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
s.handleErrors(errs)
continue
}
s.processFetches(ctx, fetches)
}
}
}
func (s *StockEventSubscriber) processFetches(ctx context.Context, fetches kgo.Fetches) {
fetches.EachPartition(func(p kgo.FetchTopicPartition) {
p.EachRecord(func(record *kgo.Record) {
start := time.Now()
s.metrics.eventLatency.WithLabelValues("consume").Observe(
time.Since(start).Seconds(),
)
var event StockEvent
if err := json.Unmarshal(record.Value, &event); err != nil {
s.logger.Error("failed to unmarshal event",
zap.Error(err),
zap.String("topic", record.Topic),
zap.Int32("partition", record.Partition),
zap.Int64("offset", record.Offset))
return
}
if err := s.handler.HandleStockEvent(ctx, event); err != nil {
s.logger.Error("failed to handle event",
zap.Error(err),
zap.String("event_id", event.ID),
zap.String("event_type", event.Type))
return
}
s.logger.Info("event processed successfully",
zap.String("event_id", event.ID),
zap.String("event_type", event.Type),
zap.String("item_id", event.ItemID),
zap.Duration("processing_time", time.Since(start)))
})
})
if err := s.client.CommitOffsets(ctx); err != nil {
s.logger.Error("failed to commit offsets", zap.Error(err))
}
}
// Example handler implementation:
type StockUpdateHandler struct {
repository *StockRepository
logger *zap.Logger
}
func (h *StockUpdateHandler) HandleStockEvent(ctx context.Context, event StockEvent) error {
h.logger.Info("handling stock event",
zap.String("event_id", event.ID),
zap.String("event_type", event.Type))
switch event.Type {
case "STOCK_UPDATED":
return h.handleStockUpdate(ctx, event)
case "STOCK_RESERVED":
return h.handleStockReservation(ctx, event)
default:
return fmt.Errorf("unknown event type: %s", event.Type)
}
}
// Example usage and logs:
func ExampleSubscriber() {
handler := &StockUpdateHandler{
repository: repo,
logger: logger,
}
subscriber := &StockEventSubscriber{
client: redpandaClient,
handler: handler,
logger: logger,
}
if err := subscriber.Start(context.Background()); err != nil {
log.Fatalf("Failed to start subscriber: %v", err)
}
// Expected log output:
// {"level":"info","ts":"2025-03-17T11:23:45.678Z","caller":"subscriber/subscriber.go:112",
// "msg":"starting stock event subscriber","topics":["inventory.stock.events"]}
//
// {"level":"info","ts":"2025-03-17T11:23:46.789Z","caller":"subscriber/subscriber.go:156",
// "msg":"event processed successfully","event_id":"550e8400-e29b-41d4-a716-446655440000",
// "event_type":"STOCK_UPDATED","item_id":"item-123","processing_time":"0.123s"}
}
Here’s what you might see in your logs and metrics when the system is running:
# Connection Logs
[2025-03-17T11:20:00Z] INFO Connecting to Redpanda brokers: [localhost:9092]
[2025-03-17T11:20:00Z] INFO Connected to Redpanda cluster successfully
[2025-03-17T11:20:00Z] INFO Schema Registry connected at http://localhost:8081
# Publisher Logs
[2025-03-17T11:23:45Z] INFO Publishing stock update event
[2025-03-17T11:23:45Z] DEBUG Event payload: {"id":"550e8400-e29b-41d4-a716-446655440000","type":"STOCK_UPDATED",...}
[2025-03-17T11:23:45Z] INFO Event published successfully (topic=inventory.stock.events, partition=0, offset=1234)
# Subscriber Logs
[2025-03-17T11:23:45Z] INFO Starting stock event subscriber
[2025-03-17T11:23:45Z] INFO Subscribed to topics: [inventory.stock.events]
[2025-03-17T11:23:46Z] INFO Received event: STOCK_UPDATED (id=550e8400-e29b-41d4-a716-446655440000)
[2025-03-17T11:23:46Z] DEBUG Processing event for item-123: quantity=100, location=warehouse-1
[2025-03-17T11:23:46Z] INFO Event processed successfully (processing_time=123ms)
# Error Logs
[2025-03-17T11:25:00Z] ERROR Failed to connect to broker: connection refused
[2025-03-17T11:25:01Z] INFO Retrying connection (attempt 1/3)
[2025-03-17T11:25:10Z] ERROR Failed to process event: validation error (event_id=550e8400-e29b-41d4-a716-446655440000)
# Metrics (Prometheus format)
# HELP redpanda_events_total Total number of events processed
# TYPE redpanda_events_total counter
redpanda_events_total{type="published"} 1234
redpanda_events_total{type="consumed"} 1200
# HELP redpanda_event_processing_duration_seconds Event processing duration
# TYPE redpanda_event_processing_duration_seconds histogram
redpanda_event_processing_duration_seconds_bucket{le="0.1"} 980
redpanda_event_processing_duration_seconds_bucket{le="0.5"} 1150
redpanda_event_processing_duration_seconds_bucket{le="1.0"} 1200
# HELP redpanda_connection_status Current connection status
# TYPE redpanda_connection_status gauge
redpanda_connection_status{broker="localhost:9092"} 1
mindmap root((Testing Strategy)) Unit Tests Repository Tests Service Tests Handler Tests Event Tests Integration Tests API Tests Event Flow Tests Database Tests Performance Tests Load Tests Stress Tests Scalability Tests Monitoring Metrics Logging Tracing
// stock_test.go
package main
func TestStockService(t *testing.T) {
// Set up test environment
ctx := context.Background()
cfg := testConfig()
// Create test dependencies
db := setupTestDB(t)
redpanda := setupTestRedpanda(t)
cache := setupTestCache(t)
// Create service
svc := NewStockService(cfg, db, redpanda, cache)
// Run tests
t.Run("GetStock", func(t *testing.T) {
// Test cases
tests := []struct {
name string
itemID uuid.UUID
want *StockLevel
wantErr bool
}{
{
name: "existing item",
itemID: testItemID,
want: testStock,
wantErr: false,
},
{
name: "non-existing item",
itemID: uuid.New(),
want: nil,
wantErr: true,
},
}
// Run test cases
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := svc.GetStock(ctx, tt.itemID)
if (err != nil) != tt.wantErr {
t.Errorf("GetStock() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("GetStock() = %v, want %v", got, tt.want)
}
})
}
})
t.Run("UpdateStock", func(t *testing.T) {
// Test cases for stock updates
})
t.Run("EventHandling", func(t *testing.T) {
// Test cases for event handling
})
}
# stock-service.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: stock-service
spec:
replicas: 3
selector:
matchLabels:
app: stock-service
template:
metadata:
labels:
app: stock-service
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "8080"
prometheus.io/path: "/metrics"
spec:
containers:
- name: stock-service
image: inventory/stock-service:latest
ports:
- containerPort: 8080
env:
- name: DB_HOST
valueFrom:
configMapKeyRef:
name: inventory-config
key: db_host
- name: REDPANDA_BROKERS
valueFrom:
configMapKeyRef:
name: inventory-config
key: redpanda_brokers
- name: REDIS_ADDR
valueFrom:
configMapKeyRef:
name: inventory-config
key: redis_addr
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
volumeMounts:
- name: config
mountPath: /app/config
readOnly: true
volumes:
- name: config
configMap:
name: stock-service-config
---
apiVersion: v1
kind: Service
metadata:
name: stock-service
spec:
selector:
app: stock-service
ports:
- port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: stock-service
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: stock-service
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
// metrics.go
package main
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type Metrics struct {
stockOperations *prometheus.CounterVec
stockLevels *prometheus.GaugeVec
operationDuration *prometheus.HistogramVec
cacheHitRate *prometheus.GaugeVec
eventLatency *prometheus.HistogramVec
}
func NewMetrics(reg prometheus.Registerer) *Metrics {
m := &Metrics{
stockOperations: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "stock_operations_total",
Help: "Total number of stock operations",
},
[]string{"operation", "status"},
),
stockLevels: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "stock_levels",
Help: "Current stock levels by item",
},
[]string{"item_id", "location_id"},
),
operationDuration: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "stock_operation_duration_seconds",
Help: "Duration of stock operations",
Buckets: prometheus.DefBuckets,
},
[]string{"operation"},
),
cacheHitRate: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cache_hit_rate",
Help: "Cache hit rate for stock operations",
},
[]string{"operation"},
),
eventLatency: promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "event_processing_duration_seconds",
Help: "Duration of event processing",
Buckets: prometheus.DefBuckets,
},
[]string{"event_type"},
),
}
reg.MustRegister(
m.stockOperations,
m.stockLevels,
m.operationDuration,
m.cacheHitRate,
m.eventLatency,
)
return m
}
// logger.go
package main
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func NewLogger(env string) (*zap.Logger, error) {
var config zap.Config
if env == "production" {
config = zap.NewProductionConfig()
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
} else {
config = zap.NewDevelopmentConfig()
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
}
config.OutputPaths = []string{"stdout"}
config.ErrorOutputPaths = []string{"stderr"}
return config.Build(
zap.AddCaller(),
zap.AddStacktrace(zapcore.ErrorLevel),
)
}
// Example usage in service
func (s *StockService) logOperation(ctx context.Context, operation string, fields ...zap.Field) {
defaultFields := []zap.Field{
zap.String("operation", operation),
zap.String("service", "stock-service"),
}
if traceID := trace.FromContext(ctx); traceID != "" {
defaultFields = append(defaultFields, zap.String("traceId", traceID))
}
s.logger.Info("Operation executed",
append(defaultFields, fields...)...)
}
// tracing.go
package main
import (
"context"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/attribute"
)
func initTracer(serviceName string) (func(), error) {
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint("http://jaeger-collector:14268/api/traces"),
))
if err != nil {
return nil, fmt.Errorf("create jaeger exporter: %w", err)
}
tp := trace.NewTracerProvider(
trace.WithBatcher(exporter),
trace.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
)),
)
otel.SetTracerProvider(tp)
return func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Printf("Error shutting down tracer provider: %v", err)
}
}, nil
}
// Example usage in service
func (s *StockService) UpdateStock(ctx context.Context, req UpdateStockRequest) error {
ctx, span := s.tracer.Start(ctx, "UpdateStock")
defer span.End()
span.SetAttributes(
attribute.String("item.id", req.ItemID.String()),
attribute.Int("quantity", req.Quantity),
)
// Business logic here
if err := s.doUpdate(ctx, req); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return err
}
return nil
}
flowchart TB subgraph "Cache Strategy" direction TB A[Request] --> B{Cache Hit?} B -->|Yes| C[Return Cached] B -->|No| D[Query DB] D --> E[Update Cache] E --> F[Return Fresh] end subgraph "Cache Invalidation" direction TB G[Write Operation] --> H[Update DB] H --> I[Publish Event] I --> J[Invalidate Cache] end subgraph "Cache Warming" direction TB K[Background Job] --> L[Get Popular Items] L --> M[Prefetch Data] M --> N[Update Cache] end
// cache.go
package main
type CacheManager struct {
redis *redis.Client
metrics *Metrics
logger *zap.Logger
}
func (c *CacheManager) Get(ctx context.Context, key string, value interface{}) error {
start := time.Now()
defer func() {
c.metrics.operationDuration.WithLabelValues("cache_get").Observe(
time.Since(start).Seconds(),
)
}()
data, err := c.redis.Get(ctx, key).Bytes()
if err != nil {
if err == redis.Nil {
c.metrics.cacheHitRate.WithLabelValues("get").Set(0)
return ErrCacheMiss
}
return fmt.Errorf("get from redis: %w", err)
}
if err := json.Unmarshal(data, value); err != nil {
return fmt.Errorf("unmarshal cached value: %w", err)
}
c.metrics.cacheHitRate.WithLabelValues("get").Set(1)
return nil
}
func (c *CacheManager) Set(ctx context.Context, key string, value interface{}, ttl time.Duration) error {
data, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("marshal value: %w", err)
}
if err := c.redis.Set(ctx, key, data, ttl).Err(); err != nil {
return fmt.Errorf("set in redis: %w", err)
}
return nil
}
func (c *CacheManager) Invalidate(ctx context.Context, pattern string) error {
iter := c.redis.Scan(ctx, 0, pattern, 0).Iterator()
var keys []string
for iter.Next(ctx) {
keys = append(keys, iter.Val())
}
if err := iter.Err(); err != nil {
return fmt.Errorf("scan keys: %w", err)
}
if len(keys) > 0 {
if err := c.redis.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("delete keys: %w", err)
}
}
return nil
}
// Cache warming
func (c *CacheManager) WarmCache(ctx context.Context, items []string) error {
for _, itemID := range items {
if err := c.warmItem(ctx, itemID); err != nil {
c.logger.Error("warm cache failed",
zap.String("itemId", itemID),
zap.Error(err))
continue
}
}
return nil
}
func (c *CacheManager) warmItem(ctx context.Context, itemID string) error {
// Implementation for warming up a single item
return nil
}
-- Partitioning for stock_movements
CREATE TABLE stock_movements_partition OF stock_movements
PARTITION BY RANGE (created_at);
-- Create monthly partitions
CREATE TABLE stock_movements_y2025m01 PARTITION OF stock_movements_partition
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
-- Create indexes on partitions
CREATE INDEX idx_movements_y2025m01_item
ON stock_movements_y2025m01 (item_id);
-- Materialized view for common queries
CREATE MATERIALIZED VIEW mv_stock_summary AS
SELECT
i.id as item_id,
i.sku,
i.name,
l.id as location_id,
l.name as location_name,
sl.quantity,
sl.reserved,
sl.quantity - sl.reserved as available,
sl.reorder_point,
sl.safety_stock
FROM
items i
JOIN stock_levels sl ON i.id = sl.item_id
JOIN locations l ON sl.location_id = l.id
WITH DATA;
-- Refresh materialized view
CREATE OR REPLACE FUNCTION refresh_stock_summary()
RETURNS trigger AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY mv_stock_summary;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trigger_refresh_stock_summary
AFTER INSERT OR UPDATE OR DELETE ON stock_levels
FOR EACH STATEMENT
EXECUTE FUNCTION refresh_stock_summary();
// loadtest/main.go
package main
import (
"context"
"time"
"github.com/tsenart/vegeta/lib"
)
func main() {
// Configure test
rate := vegeta.Rate{Freq: 100, Per: time.Second}
duration := 5 * time.Minute
targeter := vegeta.NewStaticTargeter(vegeta.Target{
Method: "GET",
URL: "http://stock-service/api/stock/item-1",
})
// Run test
attacker := vegeta.NewAttacker()
var metrics vegeta.Metrics
for res := range attacker.Attack(targeter, rate, duration, "Stock Service Test") {
metrics.Add(res)
}
metrics.Close()
// Report results
reporter := vegeta.NewTextReporter(&metrics)
reporter.Report(os.Stdout)
}
// circuitbreaker.go
package main
type CircuitBreaker struct {
mu sync.RWMutex
failures int
lastFailure time.Time
state State
threshold int
timeout time.Duration
}
func (cb *CircuitBreaker) Execute(operation func() error) error {
if !cb.canExecute() {
return ErrCircuitOpen
}
err := operation()
cb.updateState(err)
return err
}
func (cb *CircuitBreaker) canExecute() bool {
cb.mu.RLock()
defer cb.mu.RUnlock()
if cb.state == StateClosed {
return true
}
if cb.state == StateOpen &&
time.Since(cb.lastFailure) >= cb.timeout {
return true
}
return false
}
func (cb *CircuitBreaker) updateState(err error) {
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailure = time.Now()
if cb.failures >= cb.threshold {
cb.state = StateOpen
}
} else {
cb.failures = 0
cb.state = StateClosed
}
}
// retry.go
package main
func retry(ctx context.Context, operation func() error, opts RetryOptions) error {
var lastErr error
for attempt := 0; attempt < opts.MaxAttempts; attempt++ {
if err := operation(); err != nil {
lastErr = err
if !opts.ShouldRetry(err) {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(opts.BackoffFunc(attempt)):
continue
}
}
return nil
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
type RetryOptions struct {
MaxAttempts int
BackoffFunc func(attempt int) time.Duration
ShouldRetry func(err error) bool
}
func ExponentialBackoff(base time.Duration) func(int) time.Duration {
return func(attempt int) time.Duration {
return base * time.Duration(math.Pow(2, float64(attempt)))
}
}
// dlq.go
package main
type DeadLetterQueue struct {
redpanda *kgo.Client
logger *zap.Logger
}
func (dlq *DeadLetterQueue) HandleFailedMessage(ctx context.Context, msg FailedMessage) error {
record := &kgo.Record{
Topic: "inventory.failures",
Key: []byte(msg.ID),
Value: msg.Data,
Headers: []kgo.RecordHeader{
{Key: "error", Value: []byte(msg.Error.Error())},
{Key: "original_topic", Value: []byte(msg.OriginalTopic)},
{Key: "retry_count", Value: []byte(strconv.Itoa(msg.RetryCount))},
{Key: "failed_at", Value: []byte(time.Now().Format(time.RFC3339))},
},
}
results := dlq.redpanda.ProduceSync(ctx, record)
return results.FirstErr()
}
func (dlq *DeadLetterQueue) ProcessFailures(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := dlq.processFailedMessages(ctx); err != nil {
dlq.logger.Error("process failures",
zap.Error(err))
time.Sleep(time.Second * 5)
}
}
}
}
func (dlq *DeadLetterQueue) processFailedMessages(ctx context.Context) error {
// Implementation for processing failed messages
return nil
}
The Schema Registry provides a centralized schema management system that ensures data consistency and compatibility across our event-driven architecture. We use Apache Avro for schema definition due to its compact serialization and strong schema evolution capabilities.
Our schemas are organized by domain and version:
schemas/
├── stock/
│ ├── commands/
│ │ ├── v1_stock_update.avsc
│ │ └── v1_stock_reserve.avsc
│ ├── events/
│ │ ├── v1_stock_updated.avsc
│ │ └── v1_stock_reserved.avsc
│ └── models/
│ ├── v1_stock_item.avsc
│ └── v1_stock_level.avsc
└── cdc/
└── v1_stock_change.avsc
// schemas/stock/models/v1_stock_item.avsc
{
"type": "record",
"name": "StockItem",
"namespace": "com.inventory.stock",
"doc": "Represents a stock item in the inventory system",
"fields": [
{"name": "id", "type": "string", "doc": "Unique identifier for the item"},
{"name": "sku", "type": "string", "doc": "Stock keeping unit"},
{"name": "name", "type": "string", "doc": "Item name"},
{"name": "description", "type": ["null", "string"], "default": null, "doc": "Item description"},
{"name": "category_id", "type": "string", "doc": "Category identifier"},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"default": {},
"doc": "Additional item metadata"
},
{"name": "created_at", "type": "long", "logicalType": "timestamp-millis", "doc": "Creation timestamp"},
{"name": "updated_at", "type": "long", "logicalType": "timestamp-millis", "doc": "Last update timestamp"}
]
}
// schemas/stock/events/v1_stock_updated.avsc
{
"type": "record",
"name": "StockUpdatedEvent",
"namespace": "com.inventory.stock.events",
"doc": "Event emitted when stock levels are updated",
"fields": [
{"name": "event_id", "type": "string", "doc": "Unique event identifier"},
{"name": "event_type", "type": "string", "doc": "Event type identifier"},
{"name": "event_time", "type": "long", "logicalType": "timestamp-millis", "doc": "Event timestamp"},
{
"name": "item",
"type": "com.inventory.stock.StockItem",
"doc": "The stock item that was updated"
},
{
"name": "stock_level",
"type": {
"type": "record",
"name": "StockLevel",
"fields": [
{"name": "quantity", "type": "int", "doc": "Current quantity"},
{"name": "reserved", "type": "int", "doc": "Reserved quantity"},
{"name": "available", "type": "int", "doc": "Available quantity (quantity - reserved)"},
{"name": "location_id", "type": "string", "doc": "Location identifier"}
]
}
},
{
"name": "change",
"type": {
"type": "record",
"name": "StockChange",
"fields": [
{"name": "previous_quantity", "type": "int"},
{"name": "new_quantity", "type": "int"},
{"name": "change_type", "type": "string"},
{"name": "reason", "type": ["null", "string"], "default": null}
]
}
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"default": {},
"doc": "Additional event metadata"
}
]
}
We follow these schema evolution rules to maintain compatibility:
Backward Compatibility
Forward Compatibility
Version Management
// schema/registry.go
package schema
import (
"github.com/linkedin/goavro/v2"
"github.com/redpanda-data/console/backend/pkg/schema"
)
type SchemaRegistry struct {
client *schema.Client
codecs map[string]*goavro.Codec
logger *zap.Logger
}
func NewSchemaRegistry(config SchemaRegistryConfig) (*SchemaRegistry, error) {
client, err := schema.NewClient(schema.Config{
URLs: config.URLs,
Username: config.Username,
Password: config.Password,
})
if err != nil {
return nil, fmt.Errorf("create schema registry client: %w", err)
}
return &SchemaRegistry{
client: client,
codecs: make(map[string]*goavro.Codec),
logger: config.Logger,
}, nil
}
func (r *SchemaRegistry) GetSchema(subject string, version int) (*goavro.Codec, error) {
// Check cache first
cacheKey := fmt.Sprintf("%s-v%d", subject, version)
if codec, ok := r.codecs[cacheKey]; ok {
return codec, nil
}
// Fetch from registry
schema, err := r.client.GetSchemaByVersion(subject, version)
if err != nil {
return nil, fmt.Errorf("get schema: %w", err)
}
// Create codec
codec, err := goavro.NewCodec(schema.Schema)
if err != nil {
return nil, fmt.Errorf("create codec: %w", err)
}
// Cache codec
r.codecs[cacheKey] = codec
return codec, nil
}
func (r *SchemaRegistry) RegisterSchema(subject, schema string) (int, error) {
// Validate schema first
if _, err := goavro.NewCodec(schema); err != nil {
return 0, fmt.Errorf("invalid schema: %w", err)
}
// Register with compatibility check
id, err := r.client.RegisterSchema(subject, schema, schema.AVRO)
if err != nil {
return 0, fmt.Errorf("register schema: %w", err)
}
return id, nil
}
// events/serialization.go
package events
import (
"encoding/binary"
"github.com/linkedin/goavro/v2"
)
const (
schemaIDLength = 4
magicByte = byte(0)
)
type Serializer struct {
registry *schema.SchemaRegistry
subject string
version int
codec *goavro.Codec
}
func NewSerializer(registry *schema.SchemaRegistry, subject string, version int) (*Serializer, error) {
codec, err := registry.GetSchema(subject, version)
if err != nil {
return nil, fmt.Errorf("get schema: %w", err)
}
return &Serializer{
registry: registry,
subject: subject,
version: version,
codec: codec,
}, nil
}
func (s *Serializer) Serialize(event interface{}) ([]byte, error) {
// Convert event to native Go map
native, err := event.ToNative()
if err != nil {
return nil, fmt.Errorf("convert to native: %w", err)
}
// Serialize data
data, err := s.codec.BinaryFromNative(nil, native)
if err != nil {
return nil, fmt.Errorf("serialize: %w", err)
}
// Add schema ID
schemaID := make([]byte, schemaIDLength)
binary.BigEndian.PutUint32(schemaID, uint32(s.version))
// Combine magic byte, schema ID, and data
result := make([]byte, 1+schemaIDLength+len(data))
result[0] = magicByte
copy(result[1:], schemaID)
copy(result[1+schemaIDLength:], data)
return result, nil
}
func (s *Serializer) Deserialize(data []byte) (interface{}, error) {
if len(data) < 1+schemaIDLength {
return nil, fmt.Errorf("data too short")
}
// Verify magic byte
if data[0] != magicByte {
return nil, fmt.Errorf("invalid magic byte")
}
// Extract schema ID
schemaID := binary.BigEndian.Uint32(data[1:1+schemaIDLength])
// Get codec for schema ID
codec, err := s.registry.GetSchema(s.subject, int(schemaID))
if err != nil {
return nil, fmt.Errorf("get schema: %w", err)
}
// Deserialize data
native, _, err := codec.NativeFromBinary(data[1+schemaIDLength:])
if err != nil {
return nil, fmt.Errorf("deserialize: %w", err)
}
return native, nil
}
// Example of publishing an event with schema validation
func (p *StockEventPublisher) PublishStockUpdatedEvent(ctx context.Context, event StockUpdatedEvent) error {
// Get serializer for stock updated events
serializer, err := NewSerializer(p.registry, "inventory.stock.events.StockUpdated", 1)
if err != nil {
return fmt.Errorf("create serializer: %w", err)
}
// Serialize event
data, err := serializer.Serialize(event)
if err != nil {
return fmt.Errorf("serialize event: %w", err)
}
// Create record
record := &kgo.Record{
Topic: "inventory.stock.events",
Key: []byte(event.Item.ID),
Value: data,
Headers: []kgo.RecordHeader{
{Key: "event_type", Value: []byte(event.EventType)},
{Key: "schema_version", Value: []byte("1")},
},
}
// Publish event
results := p.client.ProduceSync(ctx, record)
if err := results.FirstErr(); err != nil {
return fmt.Errorf("produce event: %w", err)
}
return nil
}
// Example of consuming an event with schema validation
func (s *StockEventSubscriber) handleRecord(ctx context.Context, record *kgo.Record) error {
// Get deserializer for stock events
deserializer, err := NewSerializer(s.registry, "inventory.stock.events.StockUpdated", 1)
if err != nil {
return fmt.Errorf("create deserializer: %w", err)
}
// Deserialize event
native, err := deserializer.Deserialize(record.Value)
if err != nil {
return fmt.Errorf("deserialize event: %w", err)
}
// Convert to concrete type
event, err := StockUpdatedEventFromNative(native)
if err != nil {
return fmt.Errorf("convert event: %w", err)
}
// Process event
return s.handler.HandleStockEvent(ctx, event)
}
Here’s an example of evolving the StockItem schema while maintaining compatibility:
// schemas/stock/models/v2_stock_item.avsc
{
"type": "record",
"name": "StockItem",
"namespace": "com.inventory.stock",
"doc": "Represents a stock item in the inventory system",
"fields": [
{ "name": "id", "type": "string", "doc": "Unique identifier for the item" },
{ "name": "sku", "type": "string", "doc": "Stock keeping unit" },
{ "name": "name", "type": "string", "doc": "Item name" },
{
"name": "description",
"type": ["null", "string"],
"default": null,
"doc": "Item description"
},
{ "name": "category_id", "type": "string", "doc": "Category identifier" },
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"default": {},
"doc": "Additional item metadata"
},
{
"name": "created_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "Creation timestamp"
},
{
"name": "updated_at",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "Last update timestamp"
},
// New fields added in v2
{
"name": "tags",
"type": {
"type": "array",
"items": "string"
},
"default": [],
"doc": "Item tags for categorization"
},
{
"name": "dimensions",
"type": [
"null",
{
"type": "record",
"name": "Dimensions",
"fields": [
{ "name": "length", "type": "double" },
{ "name": "width", "type": "double" },
{ "name": "height", "type": "double" },
{ "name": "unit", "type": "string" }
]
}
],
"default": null,
"doc": "Item dimensions"
}
]
}
This schema evolution:
Building a modern inventory management system requires careful consideration of architecture, scalability, and reliability. Throughout this guide, we’ve explored how to build a robust, event-driven inventory system that can handle the demands of modern commerce.
Event-Driven Architecture: By leveraging Redpanda as our event streaming platform, we’ve created a system where services can communicate asynchronously, enabling real-time processing and system decoupling. This approach allows our inventory system to scale independently and maintain high throughput even during peak loads.
CQRS Pattern: Separating our read and write concerns through the Command Query Responsibility Segregation pattern has allowed us to optimize each path independently. Write operations maintain consistency through transactional boundaries, while read operations can be highly optimized with caching and specialized data models.
Schema Evolution: Using Schema Registry with Redpanda ensures that our events maintain compatibility as our system evolves. This provides a robust foundation for long-term maintainability and prevents breaking changes from disrupting operations.
Change Data Capture: Implementing CDC with Debezium allows us to capture database changes as events, creating a reliable audit trail and enabling real-time analytics without impacting our primary database performance.
Resilience Patterns: Through circuit breakers, retry mechanisms, and dead letter queues, our system can gracefully handle failures and recover automatically from transient issues, ensuring high availability and data integrity.
Observability: Comprehensive monitoring with Prometheus metrics, structured logging, and distributed tracing gives us visibility into system behavior, making it easier to diagnose issues and optimize performance.
The modern event-driven inventory management system delivers significant business value across multiple dimensions:
graph LR subgraph "Customer Experience Impact" direction TB CE1[Real-time Visibility] CE2[Accurate Delivery Promises] CE3[No Overselling] CE4[Omnichannel Experience] CE5[Personalized Service] CE1 & CE2 & CE3 & CE4 & CE5 --> CS[Enhanced Customer Satisfaction] end subgraph "Operational Excellence" direction TB OE1[Automated Workflows] OE2[Real-time Analytics] OE3[Reduced Manual Work] OE4[Predictive Insights] OE5[Process Optimization] OE1 & OE2 & OE3 & OE4 & OE5 --> OI[Operational Improvement] end subgraph "Financial Benefits" direction TB FB1[Reduced Inventory Costs] FB2[Better Cash Flow] FB3[Lower Operating Costs] FB4[Increased Sales] FB5[Higher Margins] FB1 & FB2 & FB3 & FB4 & FB5 --> FI[Financial Impact] end subgraph "Strategic Advantages" direction TB SA1[Market Agility] SA2[Competitive Edge] SA3[Innovation Enablement] SA4[Scalable Growth] SA5[Global Expansion] SA1 & SA2 & SA3 & SA4 & SA5 --> SI[Strategic Impact] end CS --> BV[Total Business Value] OI --> BV FI --> BV SI --> BV style BV fill:#f9f,stroke:#333,stroke-width:4px style CS fill:#bbf,stroke:#333,stroke-width:2px style OI fill:#bbf,stroke:#333,stroke-width:2px style FI fill:#bbf,stroke:#333,stroke-width:2px style SI fill:#bbf,stroke:#333,stroke-width:2px classDef impact fill:#e1e1e1,stroke:#333,stroke-width:1px class CE1,CE2,CE3,CE4,CE5,OE1,OE2,OE3,OE4,OE5,FB1,FB2,FB3,FB4,FB5,SA1,SA2,SA3,SA4,SA5 impact
mindmap root((Future Evolution)) AI/ML Integration Demand Forecasting Historical Analysis Seasonal Patterns Market Trends Anomaly Detection Fraud Prevention Quality Control Pattern Recognition Automated Reordering Safety Stock Lead Time Optimal Quantity Edge Computing In-Store Processing Local Inventory POS Integration Real-time Updates Warehouse Operations Pick/Pack Optimization Space Utilization Equipment Tracking Offline Capabilities Local Cache Sync Mechanism Conflict Resolution Blockchain Integration Supply Chain Product Tracking Origin Verification Chain of Custody Smart Contracts Automated Payments SLA Enforcement Compliance Sustainability Carbon Footprint Ethical Sourcing Waste Reduction IoT Enhancement RFID Systems Asset Tracking Automated Counting Loss Prevention Sensor Networks Environmental Movement Detection Usage Patterns Automated Systems Robotic Storage AGV Integration Smart Shelving Advanced Analytics Predictive Models Stock Optimization Resource Planning Risk Assessment Real-time Dashboards KPI Monitoring Alert System Decision Support Business Intelligence Custom Reports Data Mining Trend Analysis
graph TB subgraph "Current State" C1[Basic Automation] C2[Manual Decisions] C3[Reactive Approach] C4[Limited Integration] end subgraph "Near Future" N1[Smart Automation] N2[AI-Assisted Decisions] N3[Predictive Systems] N4[IoT Integration] end subgraph "Future Vision" F1[Autonomous Operations] F2[AI-Driven Strategy] F3[Prescriptive Analytics] F4[Full Integration] end C1 --> N1 C2 --> N2 C3 --> N3 C4 --> N4 N1 --> F1 N2 --> F2 N3 --> F3 N4 --> F4 style C1 fill:#f9f,stroke:#333,stroke-width:2px style C2 fill:#f9f,stroke:#333,stroke-width:2px style C3 fill:#f9f,stroke:#333,stroke-width:2px style C4 fill:#f9f,stroke:#333,stroke-width:2px style N1 fill:#bbf,stroke:#333,stroke-width:2px style N2 fill:#bbf,stroke:#333,stroke-width:2px style N3 fill:#bbf,stroke:#333,stroke-width:2px style N4 fill:#bbf,stroke:#333,stroke-width:2px style F1 fill:#bfb,stroke:#333,stroke-width:2px style F2 fill:#bfb,stroke:#333,stroke-width:2px style F3 fill:#bfb,stroke:#333,stroke-width:2px style F4 fill:#bfb,stroke:#333,stroke-width:2px
By building on the solid foundation of event-driven architecture and cloud-native principles we’ve established, these enhancements can be incrementally added to create an increasingly sophisticated inventory management system that adapts to changing business needs.
The modern inventory management system we’ve built is not just about tracking stock levels—it’s about creating a digital nervous system that can sense, respond, and adapt to the complex demands of today’s supply chains. With this architecture, businesses can achieve the real-time visibility, operational efficiency, and scalability needed to thrive in an increasingly competitive marketplace.