AT Protocol Calendar Events Architecture

I asked Claude to evaluate how the community calendar lexicon and smoke signal features should work at “completion” whatever that means.

Prompts :

using atproto https://atproto.com/

we are implementing calendar envents and rsvps lexicons. they are stored on the user PDS and will be indexed by our platform. an at-uri is stored in the index.

Can you give me an achictectural overview of how it should work ?
also think about importing/publishing event from other systems into their own PDS.

We already have a preview app in rust with an existing atproto base impl. we have import from PDS, listing most recently updated events, listing owner events, display profiles, add, edit and create events. Login crypto and and oauth are also working.

I want a more detailed section on API, relay, firehose, hydration, and indexing
(lexicon attached)

Question : What is your feeling about this ?
Follow up : what should (of what you’ve already tought) about priorities and feature sets ?

Claude answer in next post


AT Protocol Calendar Events Architecture

Core Components

1. Personal Data Server (PDS) Layer

  • Event Storage: Calendar events are stored as records in user PDSs using the community.lexicon.calendar.event lexicon
  • RSVP Storage: RSVPs are stored as separate records linking to events via AT-URI references
  • Data Sovereignty: Users own their calendar data and control access permissions
  • Replication: Events replicate across the AT Protocol network according to user privacy settings

2. Indexing & Discovery Platform

  • AT-URI Collection: Your platform subscribes to firehose events and indexes calendar event AT-URIs
  • Search & Discovery: Provides APIs for finding events by location, time, category, etc.
  • Aggregation: Creates feeds and recommendations based on indexed events
  • Real-time Updates: Monitors the firehose for event updates, cancellations, RSVPs

3. Client Applications

  • Calendar Apps: Native apps that read/write events directly to user PDSs
  • Web Interfaces: Browser-based calendar applications
  • Mobile Apps: iOS/Android apps with offline sync capabilities
  • Specialized Clients: Event discovery apps, community organizers, etc.

Data Flow Architecture

Event Creation Flow

User Creates Event → PDS Stores Record → Firehose Broadcasts → Index Updates → Discovery Platform
  1. User Action: User creates event in client app
  2. PDS Write: Event record written to user’s PDS with community.lexicon.calendar.event schema
  3. AT-URI Generation: PDS generates unique AT-URI for the event
  4. Network Broadcast: Event propagates through AT Protocol firehose
  5. Platform Indexing: Your platform receives and indexes the AT-URI and metadata

RSVP Flow

User RSVPs → RSVP Record Created → References Event AT-URI → Index Updates
  1. RSVP Action: User indicates attendance in client
  2. RSVP Record: New record created in user’s PDS referencing event AT-URI
  3. Permission Check: System verifies user can RSVP to event
  4. Notification: Event creator optionally notified of RSVP

Event Discovery Flow

User Searches → Index Query → AT-URI Results → PDS Resolution → Event Details
  1. Search Query: User searches for events via your platform
  2. Index Lookup: Platform searches indexed AT-URIs and metadata
  3. Results Return: Relevant event AT-URIs returned
  4. Data Resolution: Client resolves AT-URIs to full records from PDSs
  5. Display: Events displayed with full details and RSVP options

Import/Export Integration

Importing from External Systems

Option 1: Direct Import to PDS

External System → Import Service → User PDS → AT-URI Generated → Platform Indexed
  • Import Service: Bridge service that authenticates with external calendars (Google, Outlook, etc.)
  • Data Transformation: Converts external formats to AT Protocol lexicon
  • User Authorization: Requires user consent and PDS write permissions
  • Deduplication: Handles duplicate events across systems

Option 2: Federated Sync

External System → Sync Bridge → Mirror PDS → Cross-Reference → Primary PDS
  • Sync Bridge: Maintains real-time sync with external systems
  • Mirror Records: Creates read-only mirrors of external events
  • User Control: Users can promote external events to their primary PDS
  • Two-way Sync: Changes in AT Protocol can optionally sync back

Publishing to External Systems

Export Service Architecture

User PDS → Export Service → External APIs → Confirmation → Status Update
  • Export Authorization: Users grant permission to publish to external platforms
  • Format Translation: Converts AT Protocol events to external formats (iCal, Google Calendar API, etc.)
  • Sync Management: Tracks which events are published where
  • Update Propagation: Handles updates and cancellations across systems

Detailed Relay, Firehose, Hydration & Indexing Architecture

Relay Infrastructure

Relay Selection Strategy

Your calendar platform needs to consume data from multiple relays to ensure comprehensive event coverage:

// Rust implementation structure
pub struct RelayManager {
    primary_relays: Vec<RelayConnection>,
    fallback_relays: Vec<RelayConnection>,
    health_checker: RelayHealthChecker,
    load_balancer: RelayLoadBalancer,
}

impl RelayManager {
    pub fn get_optimal_relay(&self, operation: OperationType) -> &RelayConnection {
        // Prioritize relays based on:
        // 1. Latency for real-time operations
        // 2. Completeness for historical sync
        // 3. Geographic proximity for location-based events
    }
}

Multi-Relay Consumption

  • Primary Relays: Subscribe to major relays (bsky.social, other large instances)
  • Community Relays: Monitor smaller community-specific relays for niche events
  • Geographic Relays: Regional relays for location-specific event discovery
  • Fallback Strategy: Automatic failover when primary relays are unavailable

Firehose Processing Pipeline

Stream Processing Architecture

pub struct FirehoseProcessor {
    stream_manager: StreamManager,
    filters: FilterChain,
    buffer: CircularBuffer<Event>,
    processors: Vec<Box<dyn EventProcessor>>,
    error_handler: ErrorRecoveryHandler,
}

pub struct StreamManager {
    connections: HashMap<RelayId, WebSocketConnection>,
    cursor_manager: CursorManager,
    backfill_service: BackfillService,
}

Event Filtering Pipeline

  1. Schema Filter: Only process community.lexicon.calendar.event and related records
  2. Temporal Filter: Focus on events within relevant time windows (e.g., past 1 year, future 2 years)
  3. Quality Filter: Skip events with invalid data or suspicious patterns
  4. Deduplication Filter: Handle duplicate events across relays
pub enum FilterResult {
    Process(ProcessedEvent),
    Skip(SkipReason),
    Defer(DeferReason), // For rate limiting or temporary issues
}

pub struct EventFilter {
    pub fn apply(&self, raw_event: &RawFirehoseEvent) -> FilterResult {
        // Apply filtering logic
    }
}

Cursor Management & Recovery

pub struct CursorManager {
    pub fn save_cursor(&mut self, relay_id: &str, cursor: &str) -> Result<()>;
    pub fn get_last_cursor(&self, relay_id: &str) -> Option<String>;
    pub fn handle_gap(&mut self, relay_id: &str, gap_start: &str, gap_end: &str) -> BackfillTask;
}
  • Persistent Cursors: Store cursor positions in database for recovery
  • Gap Detection: Identify and handle sequence gaps in the firehose
  • Backfill Strategy: Automatically request missing data when gaps are detected

Hydration Strategy

Multi-Stage Hydration

Your system needs different levels of data completeness for different use cases:

pub enum HydrationLevel {
    Minimal,    // Just AT-URI and basic metadata
    Standard,   // Full event record
    Complete,   // Event + creator profile + RSVPs
    Social,     // Complete + social signals + recommendations
}

pub struct HydrationService {
    pds_client: PDSClient,
    cache: HydrationCache,
    batch_processor: BatchHydrator,
}

Batch Hydration for Performance

pub struct BatchHydrator {
    pub async fn hydrate_batch(&self, at_uris: Vec<String>) -> Vec<HydratedEvent> {
        // Group by PDS to minimize round trips
        let grouped = self.group_by_pds(at_uris);
        
        // Parallel hydration across PDSs
        let futures: Vec<_> = grouped.into_iter()
            .map(|(pds, uris)| self.hydrate_from_pds(pds, uris))
            .collect();
            
        // Combine results and handle failures
        self.combine_results(futures).await
    }
}

Smart Caching Strategy

  • Hot Cache: Recently accessed events kept in memory
  • Warm Cache: Upcoming events pre-hydrated
  • Cold Storage: Historical events hydrated on-demand
  • Invalidation: Real-time cache updates from firehose

Indexing Architecture

Multi-Index Strategy

Your calendar platform needs several specialized indexes:

pub struct IndexManager {
    temporal_index: TemporalIndex,      // Time-based queries
    spatial_index: SpatialIndex,        // Location-based queries  
    text_index: TextIndex,              // Full-text search
    social_index: SocialIndex,          // Social graph queries
    category_index: CategoryIndex,      // Event type/category
}

Temporal Indexing

pub struct TemporalIndex {
    // Multi-level time indexes for different query patterns
    by_start_time: BTreeMap<DateTime<Utc>, Vec<EventId>>,
    by_end_time: BTreeMap<DateTime<Utc>, Vec<EventId>>,
    by_creation_time: BTreeMap<DateTime<Utc>, Vec<EventId>>,
    recurring_events: HashMap<RecurrencePattern, Vec<EventId>>,
}

impl TemporalIndex {
    pub fn query_range(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<EventId> {
        // Efficient range queries for calendar views
    }
    
    pub fn query_upcoming(&self, from: DateTime<Utc>, limit: usize) -> Vec<EventId> {
        // Optimized for "what's coming up" queries
    }
}

Spatial Indexing for Location-Based Discovery

pub struct SpatialIndex {
    // R-tree or similar spatial data structure
    rtree: RTree<EventLocationNode>,
    geocoder: GeocodingService,
}

pub struct EventLocationNode {
    event_id: EventId,
    bounds: Rectangle,
    location_type: LocationType, // Address, Geo, Foursquare, etc.
}

impl SpatialIndex {
    pub fn query_nearby(&self, point: Point, radius_km: f64) -> Vec<EventId> {
        // Efficient geographic proximity queries
    }
    
    pub fn query_bbox(&self, bounds: Rectangle) -> Vec<EventId> {
        // Bounding box queries for map views
    }
}

Real-Time Index Updates

pub struct IndexUpdateProcessor {
    pub async fn process_firehose_event(&mut self, event: &FirehoseEvent) -> Result<()> {
        match event.operation {
            Operation::Create => self.add_to_indexes(event).await,
            Operation::Update => self.update_indexes(event).await,
            Operation::Delete => self.remove_from_indexes(event).await,
        }
    }
    
    async fn add_to_indexes(&mut self, event: &FirehoseEvent) -> Result<()> {
        let hydrated = self.hydration_service.hydrate_minimal(event).await?;
        
        // Parallel index updates
        tokio::join!(
            self.temporal_index.add(hydrated.clone()),
            self.spatial_index.add(hydrated.clone()),
            self.text_index.add(hydrated.clone()),
            self.social_index.add(hydrated.clone()),
        );
        
        Ok(())
    }
}

Data Consistency & Recovery

Event Sourcing for Audit Trail

pub struct EventStore {
    pub async fn append_event(&self, event: IndexEvent) -> Result<SequenceNumber>;
    pub async fn replay_from(&self, sequence: SequenceNumber) -> impl Stream<Item = IndexEvent>;
}

pub enum IndexEvent {
    EventCreated { at_uri: String, data: EventData },
    EventUpdated { at_uri: String, changes: EventChanges },
    EventDeleted { at_uri: String },
    HydrationCompleted { at_uri: String, level: HydrationLevel },
}

Consistency Checks & Repair

pub struct ConsistencyChecker {
    pub async fn verify_index_integrity(&self) -> ConsistencyReport;
    pub async fn repair_inconsistencies(&self, report: ConsistencyReport) -> RepairResult;
}

pub struct ConsistencyReport {
    missing_from_index: Vec<String>,    // AT-URIs in firehose but not indexed
    orphaned_indexes: Vec<String>,      // Indexed events that no longer exist
    stale_hydrations: Vec<String>,      // Events needing re-hydration
}

Performance Optimization

Streaming vs Batch Processing

pub enum ProcessingMode {
    Streaming {
        buffer_size: usize,
        flush_interval: Duration,
    },
    Batch {
        batch_size: usize,
        batch_timeout: Duration,
    },
}

pub struct ProcessingConfig {
    mode: ProcessingMode,
    parallelism: usize,
    backpressure_threshold: usize,
}

Memory Management

  • Bounded Queues: Prevent memory exhaustion during traffic spikes
  • Streaming Processing: Process events without loading entire datasets
  • Selective Caching: Cache based on access patterns and event proximity
  • Garbage Collection: Remove expired events and stale cache entries

Monitoring & Observability

Key Metrics to Track

pub struct IndexingMetrics {
    pub firehose_events_per_second: Counter,
    pub hydration_success_rate: Histogram,
    pub index_update_latency: Histogram,
    pub cache_hit_rate: Gauge,
    pub backfill_queue_size: Gauge,
    pub cursor_lag_seconds: Gauge,
}

Health Checks

  • Firehose Connectivity: Monitor relay connections and cursor progress
  • Hydration Health: Track PDS response times and error rates
  • Index Freshness: Ensure indexes are up-to-date with firehose
  • Query Performance: Monitor search response times

XRPC API Design

Core XRPC Endpoints

Your calendar platform should expose a comprehensive XRPC API following AT Protocol conventions:

// XRPC API Structure
pub struct CalendarXRPC {
    query_handlers: HashMap<String, Box<dyn QueryHandler>>,
    procedure_handlers: HashMap<String, Box<dyn ProcedureHandler>>,
    auth_service: AuthService,
    rate_limiter: RateLimiter,
}

// Base XRPC method naming convention
const API_BASE: &str = "community.calendar";

Query Methods (com.atproto.repo.getRecord style)

// Event Discovery & Search Queries
pub mod queries {
    // Get events by temporal range
    pub const LIST_EVENTS: &str = "community.calendar.listEvents";
    // Parameters: start?, end?, limit?, cursor?
    
    // Search events by text
    pub const SEARCH_EVENTS: &str = "community.calendar.searchEvents"; 
    // Parameters: q, limit?, cursor?, filters?
    
    // Get events near location
    pub const GET_NEARBY_EVENTS: &str = "community.calendar.getNearbyEvents";
    // Parameters: lat, lon, radius?, limit?, cursor?
    
    // Get single event by AT-URI
    pub const GET_EVENT: &str = "community.calendar.getEvent";
    // Parameters: uri, hydration_level?
    
    // Get events by creator
    pub const GET_CREATOR_EVENTS: &str = "community.calendar.getCreatorEvents";
    // Parameters: did, limit?, cursor?, include_past?
    
    // Get RSVPs for an event
    pub const GET_EVENT_RSVPS: &str = "community.calendar.getEventRSVPs";
    // Parameters: event_uri, limit?, cursor?
    
    // Get user's RSVPs
    pub const GET_USER_RSVPS: &str = "community.calendar.getUserRSVPs";
    // Parameters: did, status?, limit?, cursor?
    
    // Get recommended events for user
    pub const GET_RECOMMENDATIONS: &str = "community.calendar.getRecommendations";
    // Parameters: limit?, cursor?, location_hint?
}

Procedure Methods (com.atproto.repo.createRecord style)

pub mod procedures {
    // Event Management
    pub const CREATE_EVENT: &str = "community.calendar.createEvent";
    pub const UPDATE_EVENT: &str = "community.calendar.updateEvent"; 
    pub const DELETE_EVENT: &str = "community.calendar.deleteEvent";
    
    // RSVP Management
    pub const CREATE_RSVP: &str = "community.calendar.createRSVP";
    pub const UPDATE_RSVP: &str = "community.calendar.updateRSVP";
    pub const DELETE_RSVP: &str = "community.calendar.deleteRSVP";
    
    // Bulk Operations
    pub const BULK_IMPORT_EVENTS: &str = "community.calendar.bulkImportEvents";
    pub const BULK_EXPORT_EVENTS: &str = "community.calendar.bulkExportEvents";
    
    // External Integration
    pub const SYNC_EXTERNAL_CALENDAR: &str = "community.calendar.syncExternalCalendar";
}

XRPC Request/Response Schemas

// Query Response Structures
#[derive(Serialize, Deserialize)]
pub struct ListEventsResponse {
    pub events: Vec<EventView>,
    pub cursor: Option<String>,
}

#[derive(Serialize, Deserialize)]
pub struct EventView {
    pub uri: String,
    pub cid: String,
    pub value: CalendarEvent,
    pub creator: ProfileViewBasic,
    pub rsvp_count: Option<RsvpCounts>,
    pub user_rsvp: Option<UserRsvpStatus>,
    pub indexed_at: DateTime<Utc>,
}

#[derive(Serialize, Deserialize)]
pub struct RsvpCounts {
    pub attending: u32,
    pub maybe: u32,
    pub declined: u32,
    pub total: u32,
}

// Query Parameters
#[derive(Serialize, Deserialize)]
pub struct ListEventsParams {
    pub start: Option<DateTime<Utc>>,
    pub end: Option<DateTime<Utc>>,
    pub limit: Option<u32>, // max 100, default 25
    pub cursor: Option<String>,
    pub creator: Option<String>, // DID
    pub mode: Option<EventMode>, // virtual, inperson, hybrid
    pub status: Option<EventStatus>,
}

#[derive(Serialize, Deserialize)]
pub struct SearchEventsParams {
    pub q: String,
    pub limit: Option<u32>,
    pub cursor: Option<String>,
    pub filters: Option<SearchFilters>,
}

#[derive(Serialize, Deserialize)]
pub struct SearchFilters {
    pub location: Option<LocationFilter>,
    pub time_range: Option<TimeRange>,
    pub event_type: Option<Vec<String>>,
    pub mode: Option<EventMode>,
}

// Procedure Input/Output
#[derive(Serialize, Deserialize)]
pub struct CreateEventInput {
    pub repo: String, // User's DID
    pub record: CalendarEvent,
    pub validate: Option<bool>,
}

#[derive(Serialize, Deserialize)]
pub struct CreateEventOutput {
    pub uri: String,
    pub cid: String,
    pub validation_status: Option<ValidationStatus>,
}

XRPC Handler Implementation

#[async_trait]
pub trait QueryHandler: Send + Sync {
    async fn handle(&self, params: Value, auth: Option<AuthContext>) -> Result<Value, XrpcError>;
}

pub struct ListEventsHandler {
    index_service: Arc<IndexService>,
    hydration_service: Arc<HydrationService>,
}

#[async_trait]
impl QueryHandler for ListEventsHandler {
    async fn handle(&self, params: Value, auth: Option<AuthContext>) -> Result<Value, XrpcError> {
        let params: ListEventsParams = serde_json::from_value(params)
            .map_err(|e| XrpcError::InvalidRequest(e.to_string()))?;
        
        // Validate parameters
        self.validate_params(&params)?;
        
        // Query indexes
        let event_ids = self.index_service.query_events(&params).await?;
        
        // Hydrate events in batch
        let events = self.hydration_service
            .hydrate_events_for_user(event_ids, auth.as_ref())
            .await?;
        
        Ok(serde_json::to_value(ListEventsResponse {
            events,
            cursor: self.generate_cursor(&params, &events),
        })?)
    }
}

pub struct CreateEventHandler {
    pds_client: Arc<PDSClient>,
    validation_service: Arc<ValidationService>,
}

#[async_trait]
impl ProcedureHandler for CreateEventHandler {
    async fn handle(&self, input: Value, auth: AuthContext) -> Result<Value, XrpcError> {
        let input: CreateEventInput = serde_json::from_value(input)?;
        
        // Verify user can write to repo
        self.verify_repo_access(&input.repo, &auth).await?;
        
        // Validate event data
        if input.validate.unwrap_or(true) {
            self.validation_service.validate_event(&input.record).await?;
        }
        
        // Create record in user's PDS
        let result = self.pds_client
            .create_record(&input.repo, "community.lexicon.calendar.event", &input.record)
            .await?;
        
        Ok(serde_json::to_value(CreateEventOutput {
            uri: result.uri,
            cid: result.cid,
            validation_status: Some(ValidationStatus::Valid),
        })?)
    }
}

Authentication & Authorization

pub struct AuthService {
    jwt_validator: JwtValidator,
    session_store: Arc<SessionStore>,
    rate_limiter: Arc<RateLimiter>,
}

pub struct AuthContext {
    pub did: String,
    pub access_token: String,
    pub scopes: Vec<String>,
    pub session_id: String,
}

impl AuthService {
    pub async fn authenticate_request(&self, headers: &HeaderMap) -> Result<Option<AuthContext>, AuthError> {
        if let Some(auth_header) = headers.get("authorization") {
            let token = self.extract_bearer_token(auth_header)?;
            let claims = self.jwt_validator.validate_token(&token).await?;
            
            Ok(Some(AuthContext {
                did: claims.sub,
                access_token: token,
                scopes: claims.scope.unwrap_or_default().split(' ').map(String::from).collect(),
                session_id: claims.sid.unwrap_or_default(),
            }))
        } else {
            Ok(None) // Anonymous access
        }
    }
    
    pub async fn check_rate_limit(&self, auth: &Option<AuthContext>) -> Result<(), RateLimitError> {
        let key = match auth {
            Some(ctx) => format!("user:{}", ctx.did),
            None => "anonymous".to_string(),
        };
        
        self.rate_limiter.check_limit(&key).await
    }
}

Rate Limiting & Quotas

pub struct RateLimiter {
    redis: Arc<RedisPool>,
    limits: RateLimitConfig,
}

pub struct RateLimitConfig {
    pub anonymous: RateLimit,
    pub authenticated: RateLimit,
    pub premium: RateLimit,
}

pub struct RateLimit {
    pub requests_per_minute: u32,
    pub requests_per_hour: u32,
    pub burst_allowance: u32,
}

impl RateLimiter {
    pub async fn check_limit(&self, key: &str) -> Result<(), RateLimitError> {
        // Sliding window rate limit implementation
        // Use Redis sorted sets for efficient sliding window
    }
}

XRPC Server Implementation

pub struct XrpcServer {
    handlers: HashMap<String, Box<dyn RequestHandler>>,
    auth_service: Arc<AuthService>,
    cors_config: CorsConfig,
}

impl XrpcServer {
    pub fn new() -> Self {
        let mut server = Self {
            handlers: HashMap::new(),
            auth_service: Arc::new(AuthService::new()),
            cors_config: CorsConfig::permissive(),
        };
        
        // Register query handlers
        server.register_query("community.calendar.listEvents", Box::new(ListEventsHandler::new()));
        server.register_query("community.calendar.searchEvents", Box::new(SearchEventsHandler::new()));
        server.register_query("community.calendar.getNearbyEvents", Box::new(NearbyEventsHandler::new()));
        server.register_query("community.calendar.getEvent", Box::new(GetEventHandler::new()));
        
        // Register procedure handlers  
        server.register_procedure("community.calendar.createEvent", Box::new(CreateEventHandler::new()));
        server.register_procedure("community.calendar.updateEvent", Box::new(UpdateEventHandler::new()));
        server.register_procedure("community.calendar.createRSVP", Box::new(CreateRsvpHandler::new()));
        
        server
    }
    
    pub async fn handle_request(&self, req: Request<Body>) -> Result<Response<Body>, Infallible> {
        // Extract method from path: /xrpc/community.calendar.listEvents
        let method = self.extract_xrpc_method(&req);
        
        // Authenticate request
        let auth = match self.auth_service.authenticate_request(req.headers()).await {
            Ok(auth) => auth,
            Err(e) => return Ok(self.error_response(XrpcError::Unauthorized(e.to_string()))),
        };
        
        // Check rate limits
        if let Err(e) = self.auth_service.check_rate_limit(&auth).await {
            return Ok(self.error_response(XrpcError::RateLimited(e.to_string())));
        }
        
        // Route to appropriate handler
        match self.handlers.get(&method) {
            Some(handler) => {
                let params = self.extract_params(&req).await;
                match handler.handle(params, auth).await {
                    Ok(response) => Ok(self.success_response(response)),
                    Err(e) => Ok(self.error_response(e)),
                }
            }
            None => Ok(self.error_response(XrpcError::MethodNotFound(method))),
        }
    }
}

Error Handling

#[derive(Debug, Serialize)]
pub enum XrpcError {
    InvalidRequest(String),
    Unauthorized(String),
    Forbidden(String),
    NotFound(String),
    RateLimited(String),
    ValidationFailed(Vec<ValidationError>),
    InternalError(String),
    MethodNotFound(String),
}

#[derive(Debug, Serialize)]
pub struct XrpcErrorResponse {
    pub error: String,
    pub message: String,
    pub details: Option<Value>,
}

impl XrpcError {
    pub fn to_http_status(&self) -> StatusCode {
        match self {
            XrpcError::InvalidRequest(_) => StatusCode::BAD_REQUEST,
            XrpcError::Unauthorized(_) => StatusCode::UNAUTHORIZED,
            XrpcError::Forbidden(_) => StatusCode::FORBIDDEN,
            XrpcError::NotFound(_) => StatusCode::NOT_FOUND,
            XrpcError::RateLimited(_) => StatusCode::TOO_MANY_REQUESTS,
            XrpcError::ValidationFailed(_) => StatusCode::BAD_REQUEST,
            XrpcError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
            XrpcError::MethodNotFound(_) => StatusCode::NOT_FOUND,
        }
    }
}

API Documentation & OpenAPI Schema

pub fn generate_openapi_spec() -> OpenApi {
    OpenApiBuilder::new()
        .info(Info::new("Calendar Events API", "1.0.0"))
        .servers(vec![
            Server::new("https://calendar.example.com/xrpc"),
        ])
        .paths(Paths::from_iter([
            ("/xrpc/community.calendar.listEvents", PathItem::new()
                .get(Operation::new()
                    .tag("events")
                    .summary("List calendar events")
                    .parameters(vec![
                        Parameter::new("start").schema(Schema::string().format("date-time")),
                        Parameter::new("end").schema(Schema::string().format("date-time")),
                        Parameter::new("limit").schema(Schema::integer().maximum(100.0)),
                        Parameter::new("cursor").schema(Schema::string()),
                    ])
                    .responses(Responses::from_iter([
                        ("200", Response::new("Success").json_schema(ListEventsResponse::schema())),
                        ("400", Response::new("Bad Request")),
                        ("401", Response::new("Unauthorized")),
                    ]))
                )
            ),
            // ... more endpoints
        ]))
        .build()
}

XRPC Integration with Existing Rust App

Since you already have authentication and basic PDS operations working, you can integrate XRPC as follows:

// In your main application
pub struct CalendarApp {
    xrpc_server: XrpcServer,
    existing_handlers: ExistingHandlers, // Your current preview app handlers
    shared_services: SharedServices,
}

impl CalendarApp {
    pub async fn run(&self) -> Result<()> {
        let app = Router::new()
            // Your existing preview app routes
            .route("/events", get(self.existing_handlers.list_events))
            .route("/events/:id", get(self.existing_handlers.get_event))
            .route("/profile/:did", get(self.existing_handlers.get_profile))
            
            // XRPC endpoints
            .route("/xrpc/:method", 
                get(|req| self.xrpc_server.handle_request(req))
                .post(|req| self.xrpc_server.handle_request(req))
            )
            
            // Serve OpenAPI documentation
            .route("/api/docs", get(|| async { generate_openapi_spec() }))
            
            .layer(CorsLayer::new())
            .layer(AuthLayer::new(self.shared_services.auth.clone()));
            
        // Start server
        axum::Server::bind(&"0.0.0.0:3000".parse()?)
            .serve(app.into_make_service())
            .await?;
            
        Ok(())
    }
}

This XRPC API design provides a comprehensive, AT Protocol-compliant interface for your calendar platform while integrating seamlessly with your existing Rust implementation.

Data Schemas

Event Record Structure

{
  "$type": "community.lexicon.calendar.event",
  "name": "Community Meetup",
  "description": "Monthly developer gathering",
  "createdAt": "2025-05-31T10:00:00Z",
  "startsAt": "2025-06-15T18:00:00Z",
  "endsAt": "2025-06-15T21:00:00Z",
  "mode": "community.lexicon.calendar.event#inperson",
  "status": "community.lexicon.calendar.event#scheduled",
  "locations": [
    {
      "$type": "community.lexicon.location.address",
      "streetAddress": "123 Main St",
      "city": "San Francisco",
      "region": "CA"
    }
  ]
}

RSVP Record Structure (Proposed)

{
  "$type": "community.lexicon.calendar.rsvp",
  "event": "at://did:plc:user123/community.lexicon.calendar.event/abc123",
  "status": "attending", // attending, maybe, declined
  "createdAt": "2025-05-31T12:00:00Z",
  "note": "Looking forward to it!"
}

Privacy & Permissions

Event Visibility Levels

  • Public: Indexed and discoverable by anyone
  • Followers Only: Visible to user’s followers
  • Private: Only accessible via direct AT-URI sharing
  • Unlisted: Not indexed but accessible if AT-URI is known

RSVP Privacy

  • Public RSVPs: Attendee list visible to all
  • Private RSVPs: Only event creator sees attendees
  • Anonymous RSVPs: Attendance counts without identity

Platform Indexing Strategy

Firehose Subscription

// Pseudocode for firehose processing
subscribe_to_firehose({
  filters: ['community.lexicon.calendar.event'],
  handler: (event) => {
    if (event.type === 'create' || event.type === 'update') {
      index_event(event.uri, extract_metadata(event.record));
    } else if (event.type === 'delete') {
      remove_from_index(event.uri);
    }
  }
});

Index Structure

  • Spatial Index: For location-based queries
  • Temporal Index: For time-range searches
  • Text Index: For name/description search
  • Graph Index: For social recommendations

Benefits of This Architecture

For Users

  • Data Ownership: Users control their calendar data
  • Interoperability: Events work across different apps and platforms
  • Privacy Control: Granular control over event visibility
  • Portability: Easy migration between services

For Developers

  • Decentralized: No single point of failure
  • Composable: Mix and match different calendar services
  • Innovation: Easy to build new features on top of the protocol
  • Standards-Based: Uses established AT Protocol patterns

For Event Organizers

  • Reach: Events discoverable across the entire AT Protocol network
  • Engagement: Rich RSVP and interaction capabilities
  • Analytics: Aggregated insights while preserving privacy
  • Integration: Works with existing event management tools

Implementation Phases

Phase 1: Core Infrastructure

  1. Implement basic event lexicon on PDSs
  2. Build indexing service for event discovery
  3. Create reference client application

Phase 2: Social Features

  1. Add RSVP lexicon and functionality
  2. Implement event sharing and recommendations
  3. Build notification systems

Phase 3: Integration

  1. External calendar import/export
  2. Third-party platform publishing
  3. Advanced discovery and recommendation algorithms

Phase 4: Advanced Features

  1. Recurring events support
  2. Event series and collections
  3. Ticketing and payment integration
  4. Live event features (streaming, real-time updates)

This architecture leverages AT Protocol’s strengths of decentralization, user control, and interoperability while providing the rich functionality needed for a comprehensive calendar and events platform.