AT Protocol Calendar Events Architecture
Core Components
1. Personal Data Server (PDS) Layer
- Event Storage: Calendar events are stored as records in user PDSs using the
community.lexicon.calendar.event
lexicon
- RSVP Storage: RSVPs are stored as separate records linking to events via AT-URI references
- Data Sovereignty: Users own their calendar data and control access permissions
- Replication: Events replicate across the AT Protocol network according to user privacy settings
2. Indexing & Discovery Platform
- AT-URI Collection: Your platform subscribes to firehose events and indexes calendar event AT-URIs
- Search & Discovery: Provides APIs for finding events by location, time, category, etc.
- Aggregation: Creates feeds and recommendations based on indexed events
- Real-time Updates: Monitors the firehose for event updates, cancellations, RSVPs
3. Client Applications
- Calendar Apps: Native apps that read/write events directly to user PDSs
- Web Interfaces: Browser-based calendar applications
- Mobile Apps: iOS/Android apps with offline sync capabilities
- Specialized Clients: Event discovery apps, community organizers, etc.
Data Flow Architecture
Event Creation Flow
User Creates Event → PDS Stores Record → Firehose Broadcasts → Index Updates → Discovery Platform
- User Action: User creates event in client app
- PDS Write: Event record written to user’s PDS with
community.lexicon.calendar.event
schema
- AT-URI Generation: PDS generates unique AT-URI for the event
- Network Broadcast: Event propagates through AT Protocol firehose
- Platform Indexing: Your platform receives and indexes the AT-URI and metadata
RSVP Flow
User RSVPs → RSVP Record Created → References Event AT-URI → Index Updates
- RSVP Action: User indicates attendance in client
- RSVP Record: New record created in user’s PDS referencing event AT-URI
- Permission Check: System verifies user can RSVP to event
- Notification: Event creator optionally notified of RSVP
Event Discovery Flow
User Searches → Index Query → AT-URI Results → PDS Resolution → Event Details
- Search Query: User searches for events via your platform
- Index Lookup: Platform searches indexed AT-URIs and metadata
- Results Return: Relevant event AT-URIs returned
- Data Resolution: Client resolves AT-URIs to full records from PDSs
- Display: Events displayed with full details and RSVP options
Import/Export Integration
Importing from External Systems
Option 1: Direct Import to PDS
External System → Import Service → User PDS → AT-URI Generated → Platform Indexed
- Import Service: Bridge service that authenticates with external calendars (Google, Outlook, etc.)
- Data Transformation: Converts external formats to AT Protocol lexicon
- User Authorization: Requires user consent and PDS write permissions
- Deduplication: Handles duplicate events across systems
Option 2: Federated Sync
External System → Sync Bridge → Mirror PDS → Cross-Reference → Primary PDS
- Sync Bridge: Maintains real-time sync with external systems
- Mirror Records: Creates read-only mirrors of external events
- User Control: Users can promote external events to their primary PDS
- Two-way Sync: Changes in AT Protocol can optionally sync back
Publishing to External Systems
Export Service Architecture
User PDS → Export Service → External APIs → Confirmation → Status Update
- Export Authorization: Users grant permission to publish to external platforms
- Format Translation: Converts AT Protocol events to external formats (iCal, Google Calendar API, etc.)
- Sync Management: Tracks which events are published where
- Update Propagation: Handles updates and cancellations across systems
Detailed Relay, Firehose, Hydration & Indexing Architecture
Relay Infrastructure
Relay Selection Strategy
Your calendar platform needs to consume data from multiple relays to ensure comprehensive event coverage:
// Rust implementation structure
pub struct RelayManager {
primary_relays: Vec<RelayConnection>,
fallback_relays: Vec<RelayConnection>,
health_checker: RelayHealthChecker,
load_balancer: RelayLoadBalancer,
}
impl RelayManager {
pub fn get_optimal_relay(&self, operation: OperationType) -> &RelayConnection {
// Prioritize relays based on:
// 1. Latency for real-time operations
// 2. Completeness for historical sync
// 3. Geographic proximity for location-based events
}
}
Multi-Relay Consumption
- Primary Relays: Subscribe to major relays (bsky.social, other large instances)
- Community Relays: Monitor smaller community-specific relays for niche events
- Geographic Relays: Regional relays for location-specific event discovery
- Fallback Strategy: Automatic failover when primary relays are unavailable
Firehose Processing Pipeline
Stream Processing Architecture
pub struct FirehoseProcessor {
stream_manager: StreamManager,
filters: FilterChain,
buffer: CircularBuffer<Event>,
processors: Vec<Box<dyn EventProcessor>>,
error_handler: ErrorRecoveryHandler,
}
pub struct StreamManager {
connections: HashMap<RelayId, WebSocketConnection>,
cursor_manager: CursorManager,
backfill_service: BackfillService,
}
Event Filtering Pipeline
- Schema Filter: Only process
community.lexicon.calendar.event
and related records
- Temporal Filter: Focus on events within relevant time windows (e.g., past 1 year, future 2 years)
- Quality Filter: Skip events with invalid data or suspicious patterns
- Deduplication Filter: Handle duplicate events across relays
pub enum FilterResult {
Process(ProcessedEvent),
Skip(SkipReason),
Defer(DeferReason), // For rate limiting or temporary issues
}
pub struct EventFilter {
pub fn apply(&self, raw_event: &RawFirehoseEvent) -> FilterResult {
// Apply filtering logic
}
}
Cursor Management & Recovery
pub struct CursorManager {
pub fn save_cursor(&mut self, relay_id: &str, cursor: &str) -> Result<()>;
pub fn get_last_cursor(&self, relay_id: &str) -> Option<String>;
pub fn handle_gap(&mut self, relay_id: &str, gap_start: &str, gap_end: &str) -> BackfillTask;
}
- Persistent Cursors: Store cursor positions in database for recovery
- Gap Detection: Identify and handle sequence gaps in the firehose
- Backfill Strategy: Automatically request missing data when gaps are detected
Hydration Strategy
Multi-Stage Hydration
Your system needs different levels of data completeness for different use cases:
pub enum HydrationLevel {
Minimal, // Just AT-URI and basic metadata
Standard, // Full event record
Complete, // Event + creator profile + RSVPs
Social, // Complete + social signals + recommendations
}
pub struct HydrationService {
pds_client: PDSClient,
cache: HydrationCache,
batch_processor: BatchHydrator,
}
Batch Hydration for Performance
pub struct BatchHydrator {
pub async fn hydrate_batch(&self, at_uris: Vec<String>) -> Vec<HydratedEvent> {
// Group by PDS to minimize round trips
let grouped = self.group_by_pds(at_uris);
// Parallel hydration across PDSs
let futures: Vec<_> = grouped.into_iter()
.map(|(pds, uris)| self.hydrate_from_pds(pds, uris))
.collect();
// Combine results and handle failures
self.combine_results(futures).await
}
}
Smart Caching Strategy
- Hot Cache: Recently accessed events kept in memory
- Warm Cache: Upcoming events pre-hydrated
- Cold Storage: Historical events hydrated on-demand
- Invalidation: Real-time cache updates from firehose
Indexing Architecture
Multi-Index Strategy
Your calendar platform needs several specialized indexes:
pub struct IndexManager {
temporal_index: TemporalIndex, // Time-based queries
spatial_index: SpatialIndex, // Location-based queries
text_index: TextIndex, // Full-text search
social_index: SocialIndex, // Social graph queries
category_index: CategoryIndex, // Event type/category
}
Temporal Indexing
pub struct TemporalIndex {
// Multi-level time indexes for different query patterns
by_start_time: BTreeMap<DateTime<Utc>, Vec<EventId>>,
by_end_time: BTreeMap<DateTime<Utc>, Vec<EventId>>,
by_creation_time: BTreeMap<DateTime<Utc>, Vec<EventId>>,
recurring_events: HashMap<RecurrencePattern, Vec<EventId>>,
}
impl TemporalIndex {
pub fn query_range(&self, start: DateTime<Utc>, end: DateTime<Utc>) -> Vec<EventId> {
// Efficient range queries for calendar views
}
pub fn query_upcoming(&self, from: DateTime<Utc>, limit: usize) -> Vec<EventId> {
// Optimized for "what's coming up" queries
}
}
Spatial Indexing for Location-Based Discovery
pub struct SpatialIndex {
// R-tree or similar spatial data structure
rtree: RTree<EventLocationNode>,
geocoder: GeocodingService,
}
pub struct EventLocationNode {
event_id: EventId,
bounds: Rectangle,
location_type: LocationType, // Address, Geo, Foursquare, etc.
}
impl SpatialIndex {
pub fn query_nearby(&self, point: Point, radius_km: f64) -> Vec<EventId> {
// Efficient geographic proximity queries
}
pub fn query_bbox(&self, bounds: Rectangle) -> Vec<EventId> {
// Bounding box queries for map views
}
}
Real-Time Index Updates
pub struct IndexUpdateProcessor {
pub async fn process_firehose_event(&mut self, event: &FirehoseEvent) -> Result<()> {
match event.operation {
Operation::Create => self.add_to_indexes(event).await,
Operation::Update => self.update_indexes(event).await,
Operation::Delete => self.remove_from_indexes(event).await,
}
}
async fn add_to_indexes(&mut self, event: &FirehoseEvent) -> Result<()> {
let hydrated = self.hydration_service.hydrate_minimal(event).await?;
// Parallel index updates
tokio::join!(
self.temporal_index.add(hydrated.clone()),
self.spatial_index.add(hydrated.clone()),
self.text_index.add(hydrated.clone()),
self.social_index.add(hydrated.clone()),
);
Ok(())
}
}
Data Consistency & Recovery
Event Sourcing for Audit Trail
pub struct EventStore {
pub async fn append_event(&self, event: IndexEvent) -> Result<SequenceNumber>;
pub async fn replay_from(&self, sequence: SequenceNumber) -> impl Stream<Item = IndexEvent>;
}
pub enum IndexEvent {
EventCreated { at_uri: String, data: EventData },
EventUpdated { at_uri: String, changes: EventChanges },
EventDeleted { at_uri: String },
HydrationCompleted { at_uri: String, level: HydrationLevel },
}
Consistency Checks & Repair
pub struct ConsistencyChecker {
pub async fn verify_index_integrity(&self) -> ConsistencyReport;
pub async fn repair_inconsistencies(&self, report: ConsistencyReport) -> RepairResult;
}
pub struct ConsistencyReport {
missing_from_index: Vec<String>, // AT-URIs in firehose but not indexed
orphaned_indexes: Vec<String>, // Indexed events that no longer exist
stale_hydrations: Vec<String>, // Events needing re-hydration
}
Performance Optimization
Streaming vs Batch Processing
pub enum ProcessingMode {
Streaming {
buffer_size: usize,
flush_interval: Duration,
},
Batch {
batch_size: usize,
batch_timeout: Duration,
},
}
pub struct ProcessingConfig {
mode: ProcessingMode,
parallelism: usize,
backpressure_threshold: usize,
}
Memory Management
- Bounded Queues: Prevent memory exhaustion during traffic spikes
- Streaming Processing: Process events without loading entire datasets
- Selective Caching: Cache based on access patterns and event proximity
- Garbage Collection: Remove expired events and stale cache entries
Monitoring & Observability
Key Metrics to Track
pub struct IndexingMetrics {
pub firehose_events_per_second: Counter,
pub hydration_success_rate: Histogram,
pub index_update_latency: Histogram,
pub cache_hit_rate: Gauge,
pub backfill_queue_size: Gauge,
pub cursor_lag_seconds: Gauge,
}
Health Checks
- Firehose Connectivity: Monitor relay connections and cursor progress
- Hydration Health: Track PDS response times and error rates
- Index Freshness: Ensure indexes are up-to-date with firehose
- Query Performance: Monitor search response times
XRPC API Design
Core XRPC Endpoints
Your calendar platform should expose a comprehensive XRPC API following AT Protocol conventions:
// XRPC API Structure
pub struct CalendarXRPC {
query_handlers: HashMap<String, Box<dyn QueryHandler>>,
procedure_handlers: HashMap<String, Box<dyn ProcedureHandler>>,
auth_service: AuthService,
rate_limiter: RateLimiter,
}
// Base XRPC method naming convention
const API_BASE: &str = "community.calendar";
Query Methods (com.atproto.repo.getRecord style)
// Event Discovery & Search Queries
pub mod queries {
// Get events by temporal range
pub const LIST_EVENTS: &str = "community.calendar.listEvents";
// Parameters: start?, end?, limit?, cursor?
// Search events by text
pub const SEARCH_EVENTS: &str = "community.calendar.searchEvents";
// Parameters: q, limit?, cursor?, filters?
// Get events near location
pub const GET_NEARBY_EVENTS: &str = "community.calendar.getNearbyEvents";
// Parameters: lat, lon, radius?, limit?, cursor?
// Get single event by AT-URI
pub const GET_EVENT: &str = "community.calendar.getEvent";
// Parameters: uri, hydration_level?
// Get events by creator
pub const GET_CREATOR_EVENTS: &str = "community.calendar.getCreatorEvents";
// Parameters: did, limit?, cursor?, include_past?
// Get RSVPs for an event
pub const GET_EVENT_RSVPS: &str = "community.calendar.getEventRSVPs";
// Parameters: event_uri, limit?, cursor?
// Get user's RSVPs
pub const GET_USER_RSVPS: &str = "community.calendar.getUserRSVPs";
// Parameters: did, status?, limit?, cursor?
// Get recommended events for user
pub const GET_RECOMMENDATIONS: &str = "community.calendar.getRecommendations";
// Parameters: limit?, cursor?, location_hint?
}
Procedure Methods (com.atproto.repo.createRecord style)
pub mod procedures {
// Event Management
pub const CREATE_EVENT: &str = "community.calendar.createEvent";
pub const UPDATE_EVENT: &str = "community.calendar.updateEvent";
pub const DELETE_EVENT: &str = "community.calendar.deleteEvent";
// RSVP Management
pub const CREATE_RSVP: &str = "community.calendar.createRSVP";
pub const UPDATE_RSVP: &str = "community.calendar.updateRSVP";
pub const DELETE_RSVP: &str = "community.calendar.deleteRSVP";
// Bulk Operations
pub const BULK_IMPORT_EVENTS: &str = "community.calendar.bulkImportEvents";
pub const BULK_EXPORT_EVENTS: &str = "community.calendar.bulkExportEvents";
// External Integration
pub const SYNC_EXTERNAL_CALENDAR: &str = "community.calendar.syncExternalCalendar";
}
XRPC Request/Response Schemas
// Query Response Structures
#[derive(Serialize, Deserialize)]
pub struct ListEventsResponse {
pub events: Vec<EventView>,
pub cursor: Option<String>,
}
#[derive(Serialize, Deserialize)]
pub struct EventView {
pub uri: String,
pub cid: String,
pub value: CalendarEvent,
pub creator: ProfileViewBasic,
pub rsvp_count: Option<RsvpCounts>,
pub user_rsvp: Option<UserRsvpStatus>,
pub indexed_at: DateTime<Utc>,
}
#[derive(Serialize, Deserialize)]
pub struct RsvpCounts {
pub attending: u32,
pub maybe: u32,
pub declined: u32,
pub total: u32,
}
// Query Parameters
#[derive(Serialize, Deserialize)]
pub struct ListEventsParams {
pub start: Option<DateTime<Utc>>,
pub end: Option<DateTime<Utc>>,
pub limit: Option<u32>, // max 100, default 25
pub cursor: Option<String>,
pub creator: Option<String>, // DID
pub mode: Option<EventMode>, // virtual, inperson, hybrid
pub status: Option<EventStatus>,
}
#[derive(Serialize, Deserialize)]
pub struct SearchEventsParams {
pub q: String,
pub limit: Option<u32>,
pub cursor: Option<String>,
pub filters: Option<SearchFilters>,
}
#[derive(Serialize, Deserialize)]
pub struct SearchFilters {
pub location: Option<LocationFilter>,
pub time_range: Option<TimeRange>,
pub event_type: Option<Vec<String>>,
pub mode: Option<EventMode>,
}
// Procedure Input/Output
#[derive(Serialize, Deserialize)]
pub struct CreateEventInput {
pub repo: String, // User's DID
pub record: CalendarEvent,
pub validate: Option<bool>,
}
#[derive(Serialize, Deserialize)]
pub struct CreateEventOutput {
pub uri: String,
pub cid: String,
pub validation_status: Option<ValidationStatus>,
}
XRPC Handler Implementation
#[async_trait]
pub trait QueryHandler: Send + Sync {
async fn handle(&self, params: Value, auth: Option<AuthContext>) -> Result<Value, XrpcError>;
}
pub struct ListEventsHandler {
index_service: Arc<IndexService>,
hydration_service: Arc<HydrationService>,
}
#[async_trait]
impl QueryHandler for ListEventsHandler {
async fn handle(&self, params: Value, auth: Option<AuthContext>) -> Result<Value, XrpcError> {
let params: ListEventsParams = serde_json::from_value(params)
.map_err(|e| XrpcError::InvalidRequest(e.to_string()))?;
// Validate parameters
self.validate_params(¶ms)?;
// Query indexes
let event_ids = self.index_service.query_events(¶ms).await?;
// Hydrate events in batch
let events = self.hydration_service
.hydrate_events_for_user(event_ids, auth.as_ref())
.await?;
Ok(serde_json::to_value(ListEventsResponse {
events,
cursor: self.generate_cursor(¶ms, &events),
})?)
}
}
pub struct CreateEventHandler {
pds_client: Arc<PDSClient>,
validation_service: Arc<ValidationService>,
}
#[async_trait]
impl ProcedureHandler for CreateEventHandler {
async fn handle(&self, input: Value, auth: AuthContext) -> Result<Value, XrpcError> {
let input: CreateEventInput = serde_json::from_value(input)?;
// Verify user can write to repo
self.verify_repo_access(&input.repo, &auth).await?;
// Validate event data
if input.validate.unwrap_or(true) {
self.validation_service.validate_event(&input.record).await?;
}
// Create record in user's PDS
let result = self.pds_client
.create_record(&input.repo, "community.lexicon.calendar.event", &input.record)
.await?;
Ok(serde_json::to_value(CreateEventOutput {
uri: result.uri,
cid: result.cid,
validation_status: Some(ValidationStatus::Valid),
})?)
}
}
Authentication & Authorization
pub struct AuthService {
jwt_validator: JwtValidator,
session_store: Arc<SessionStore>,
rate_limiter: Arc<RateLimiter>,
}
pub struct AuthContext {
pub did: String,
pub access_token: String,
pub scopes: Vec<String>,
pub session_id: String,
}
impl AuthService {
pub async fn authenticate_request(&self, headers: &HeaderMap) -> Result<Option<AuthContext>, AuthError> {
if let Some(auth_header) = headers.get("authorization") {
let token = self.extract_bearer_token(auth_header)?;
let claims = self.jwt_validator.validate_token(&token).await?;
Ok(Some(AuthContext {
did: claims.sub,
access_token: token,
scopes: claims.scope.unwrap_or_default().split(' ').map(String::from).collect(),
session_id: claims.sid.unwrap_or_default(),
}))
} else {
Ok(None) // Anonymous access
}
}
pub async fn check_rate_limit(&self, auth: &Option<AuthContext>) -> Result<(), RateLimitError> {
let key = match auth {
Some(ctx) => format!("user:{}", ctx.did),
None => "anonymous".to_string(),
};
self.rate_limiter.check_limit(&key).await
}
}
Rate Limiting & Quotas
pub struct RateLimiter {
redis: Arc<RedisPool>,
limits: RateLimitConfig,
}
pub struct RateLimitConfig {
pub anonymous: RateLimit,
pub authenticated: RateLimit,
pub premium: RateLimit,
}
pub struct RateLimit {
pub requests_per_minute: u32,
pub requests_per_hour: u32,
pub burst_allowance: u32,
}
impl RateLimiter {
pub async fn check_limit(&self, key: &str) -> Result<(), RateLimitError> {
// Sliding window rate limit implementation
// Use Redis sorted sets for efficient sliding window
}
}
XRPC Server Implementation
pub struct XrpcServer {
handlers: HashMap<String, Box<dyn RequestHandler>>,
auth_service: Arc<AuthService>,
cors_config: CorsConfig,
}
impl XrpcServer {
pub fn new() -> Self {
let mut server = Self {
handlers: HashMap::new(),
auth_service: Arc::new(AuthService::new()),
cors_config: CorsConfig::permissive(),
};
// Register query handlers
server.register_query("community.calendar.listEvents", Box::new(ListEventsHandler::new()));
server.register_query("community.calendar.searchEvents", Box::new(SearchEventsHandler::new()));
server.register_query("community.calendar.getNearbyEvents", Box::new(NearbyEventsHandler::new()));
server.register_query("community.calendar.getEvent", Box::new(GetEventHandler::new()));
// Register procedure handlers
server.register_procedure("community.calendar.createEvent", Box::new(CreateEventHandler::new()));
server.register_procedure("community.calendar.updateEvent", Box::new(UpdateEventHandler::new()));
server.register_procedure("community.calendar.createRSVP", Box::new(CreateRsvpHandler::new()));
server
}
pub async fn handle_request(&self, req: Request<Body>) -> Result<Response<Body>, Infallible> {
// Extract method from path: /xrpc/community.calendar.listEvents
let method = self.extract_xrpc_method(&req);
// Authenticate request
let auth = match self.auth_service.authenticate_request(req.headers()).await {
Ok(auth) => auth,
Err(e) => return Ok(self.error_response(XrpcError::Unauthorized(e.to_string()))),
};
// Check rate limits
if let Err(e) = self.auth_service.check_rate_limit(&auth).await {
return Ok(self.error_response(XrpcError::RateLimited(e.to_string())));
}
// Route to appropriate handler
match self.handlers.get(&method) {
Some(handler) => {
let params = self.extract_params(&req).await;
match handler.handle(params, auth).await {
Ok(response) => Ok(self.success_response(response)),
Err(e) => Ok(self.error_response(e)),
}
}
None => Ok(self.error_response(XrpcError::MethodNotFound(method))),
}
}
}
Error Handling
#[derive(Debug, Serialize)]
pub enum XrpcError {
InvalidRequest(String),
Unauthorized(String),
Forbidden(String),
NotFound(String),
RateLimited(String),
ValidationFailed(Vec<ValidationError>),
InternalError(String),
MethodNotFound(String),
}
#[derive(Debug, Serialize)]
pub struct XrpcErrorResponse {
pub error: String,
pub message: String,
pub details: Option<Value>,
}
impl XrpcError {
pub fn to_http_status(&self) -> StatusCode {
match self {
XrpcError::InvalidRequest(_) => StatusCode::BAD_REQUEST,
XrpcError::Unauthorized(_) => StatusCode::UNAUTHORIZED,
XrpcError::Forbidden(_) => StatusCode::FORBIDDEN,
XrpcError::NotFound(_) => StatusCode::NOT_FOUND,
XrpcError::RateLimited(_) => StatusCode::TOO_MANY_REQUESTS,
XrpcError::ValidationFailed(_) => StatusCode::BAD_REQUEST,
XrpcError::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
XrpcError::MethodNotFound(_) => StatusCode::NOT_FOUND,
}
}
}
API Documentation & OpenAPI Schema
pub fn generate_openapi_spec() -> OpenApi {
OpenApiBuilder::new()
.info(Info::new("Calendar Events API", "1.0.0"))
.servers(vec![
Server::new("https://calendar.example.com/xrpc"),
])
.paths(Paths::from_iter([
("/xrpc/community.calendar.listEvents", PathItem::new()
.get(Operation::new()
.tag("events")
.summary("List calendar events")
.parameters(vec![
Parameter::new("start").schema(Schema::string().format("date-time")),
Parameter::new("end").schema(Schema::string().format("date-time")),
Parameter::new("limit").schema(Schema::integer().maximum(100.0)),
Parameter::new("cursor").schema(Schema::string()),
])
.responses(Responses::from_iter([
("200", Response::new("Success").json_schema(ListEventsResponse::schema())),
("400", Response::new("Bad Request")),
("401", Response::new("Unauthorized")),
]))
)
),
// ... more endpoints
]))
.build()
}
XRPC Integration with Existing Rust App
Since you already have authentication and basic PDS operations working, you can integrate XRPC as follows:
// In your main application
pub struct CalendarApp {
xrpc_server: XrpcServer,
existing_handlers: ExistingHandlers, // Your current preview app handlers
shared_services: SharedServices,
}
impl CalendarApp {
pub async fn run(&self) -> Result<()> {
let app = Router::new()
// Your existing preview app routes
.route("/events", get(self.existing_handlers.list_events))
.route("/events/:id", get(self.existing_handlers.get_event))
.route("/profile/:did", get(self.existing_handlers.get_profile))
// XRPC endpoints
.route("/xrpc/:method",
get(|req| self.xrpc_server.handle_request(req))
.post(|req| self.xrpc_server.handle_request(req))
)
// Serve OpenAPI documentation
.route("/api/docs", get(|| async { generate_openapi_spec() }))
.layer(CorsLayer::new())
.layer(AuthLayer::new(self.shared_services.auth.clone()));
// Start server
axum::Server::bind(&"0.0.0.0:3000".parse()?)
.serve(app.into_make_service())
.await?;
Ok(())
}
}
This XRPC API design provides a comprehensive, AT Protocol-compliant interface for your calendar platform while integrating seamlessly with your existing Rust implementation.
Data Schemas
Event Record Structure
{
"$type": "community.lexicon.calendar.event",
"name": "Community Meetup",
"description": "Monthly developer gathering",
"createdAt": "2025-05-31T10:00:00Z",
"startsAt": "2025-06-15T18:00:00Z",
"endsAt": "2025-06-15T21:00:00Z",
"mode": "community.lexicon.calendar.event#inperson",
"status": "community.lexicon.calendar.event#scheduled",
"locations": [
{
"$type": "community.lexicon.location.address",
"streetAddress": "123 Main St",
"city": "San Francisco",
"region": "CA"
}
]
}
RSVP Record Structure (Proposed)
{
"$type": "community.lexicon.calendar.rsvp",
"event": "at://did:plc:user123/community.lexicon.calendar.event/abc123",
"status": "attending", // attending, maybe, declined
"createdAt": "2025-05-31T12:00:00Z",
"note": "Looking forward to it!"
}
Privacy & Permissions
Event Visibility Levels
- Public: Indexed and discoverable by anyone
- Followers Only: Visible to user’s followers
- Private: Only accessible via direct AT-URI sharing
- Unlisted: Not indexed but accessible if AT-URI is known
RSVP Privacy
- Public RSVPs: Attendee list visible to all
- Private RSVPs: Only event creator sees attendees
- Anonymous RSVPs: Attendance counts without identity
Platform Indexing Strategy
Firehose Subscription
// Pseudocode for firehose processing
subscribe_to_firehose({
filters: ['community.lexicon.calendar.event'],
handler: (event) => {
if (event.type === 'create' || event.type === 'update') {
index_event(event.uri, extract_metadata(event.record));
} else if (event.type === 'delete') {
remove_from_index(event.uri);
}
}
});
Index Structure
- Spatial Index: For location-based queries
- Temporal Index: For time-range searches
- Text Index: For name/description search
- Graph Index: For social recommendations
Benefits of This Architecture
For Users
- Data Ownership: Users control their calendar data
- Interoperability: Events work across different apps and platforms
- Privacy Control: Granular control over event visibility
- Portability: Easy migration between services
For Developers
- Decentralized: No single point of failure
- Composable: Mix and match different calendar services
- Innovation: Easy to build new features on top of the protocol
- Standards-Based: Uses established AT Protocol patterns
For Event Organizers
- Reach: Events discoverable across the entire AT Protocol network
- Engagement: Rich RSVP and interaction capabilities
- Analytics: Aggregated insights while preserving privacy
- Integration: Works with existing event management tools
Implementation Phases
Phase 1: Core Infrastructure
- Implement basic event lexicon on PDSs
- Build indexing service for event discovery
- Create reference client application
Phase 2: Social Features
- Add RSVP lexicon and functionality
- Implement event sharing and recommendations
- Build notification systems
Phase 3: Integration
- External calendar import/export
- Third-party platform publishing
- Advanced discovery and recommendation algorithms
Phase 4: Advanced Features
- Recurring events support
- Event series and collections
- Ticketing and payment integration
- Live event features (streaming, real-time updates)
This architecture leverages AT Protocol’s strengths of decentralization, user control, and interoperability while providing the rich functionality needed for a comprehensive calendar and events platform.