AI Workflows - AI Implementation Guide
High Priority AI Workflows - Technical Implementation
1. Drug Demand Forecasting System
Data Flow Architecture
EasyManage APIs → Data Pipeline → Feature Store → ML Pipeline → Prediction Service → Dashboard
Data Sources & Preprocessing
# Data extraction from EasyManage APIs
import requests
import pandas as pd
from datetime import datetime, timedelta
class EasyManageDataExtractor:
    def __init__(self, base_url="http://127.0.0.1:9080"):
        self.base_url = base_url
    
    def get_drug_sales_data(self, page=0, size=1000):
        """Extract drug sales data with pagination"""
        url = f"{self.base_url}/emdbrest/drug_sales/ViewAllPaged"
        params = {"page": page, "size": size}
        response = requests.get(url, params=params)
        return response.json()
    
    def get_drug_inventory_data(self, page=0, size=1000):
        """Extract drug inventory data with pagination"""
        url = f"{self.base_url}/emdbrest/drug_inventory/ViewAllPaged"
        params = {"page": page, "size": size}
        response = requests.get(url, params=params)
        return response.json()
    
    def get_prescriptions_data(self, page=0, size=1000):
        """Extract prescriptions data with pagination"""
        url = f"{self.base_url}/emdbrest/prescriptions/ViewAllPaged"
        params = {"page": page, "size": size}
        response = requests.get(url, params=params)
        return response.json()
# Feature engineering for demand forecasting
class DemandForecastingFeatures:
    def __init__(self):
        self.feature_columns = [
            'drug_id', 'date', 'quantity_sold', 'day_of_week',
            'month', 'quarter', 'is_holiday', 'stock_level',
            'price_level', 'prescription_count'
        ]
    
    def create_temporal_features(self, df):
        """Create time-based features"""
        df['date'] = pd.to_datetime(df['saleDate'])
        df['day_of_week'] = df['date'].dt.dayofweek
        df['month'] = df['date'].dt.month
        df['quarter'] = df['date'].dt.quarter
        df['is_holiday'] = self._is_holiday(df['date'])
        return df
    
    def create_lag_features(self, df, lag_days=[1, 7, 30]):
        """Create lagged features for time series"""
        for lag in lag_days:
            df[f'quantity_lag_{lag}'] = df.groupby('drugId')['quantity'].shift(lag)
        return df
    
    def _is_holiday(self, dates):
        """Simple holiday detection (can be enhanced with holiday calendar)"""
        # Implementation for major US holidays
        holidays = ['2024-01-01', '2024-07-04', '2024-12-25']  # Example
        return dates.dt.strftime('%Y-%m-%d').isin(holidays)
ML Model Implementation
# Demand forecasting model using Prophet
from prophet import Prophet
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
class DrugDemandForecaster:
    def __init__(self):
        self.models = {}
        self.feature_importance = {}
    
    def prepare_prophet_data(self, df, drug_id):
        """Prepare data for Prophet model"""
        drug_data = df[df['drugId'] == drug_id].copy()
        drug_data = drug_data.groupby('date')['quantity'].sum().reset_index()
        drug_data.columns = ['ds', 'y']  # Prophet requires 'ds' and 'y' columns
        return drug_data
    
    def train_prophet_model(self, df, drug_id):
        """Train Prophet model for specific drug"""
        prophet_data = self.prepare_prophet_data(df, drug_id)
        
        model = Prophet(
            yearly_seasonality=True,
            weekly_seasonality=True,
            daily_seasonality=False,
            seasonality_mode='multiplicative'
        )
        
        # Add custom seasonality if needed
        model.add_seasonality(name='monthly', period=30.5, fourier_order=5)
        
        model.fit(prophet_data)
        self.models[drug_id] = model
        return model
    
    def predict_demand(self, drug_id, periods=30):
        """Predict demand for next N periods"""
        if drug_id not in self.models:
            raise ValueError(f"Model not trained for drug {drug_id}")
        
        model = self.models[drug_id]
        future = model.make_future_dataframe(periods=periods)
        forecast = model.predict(future)
        
        return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
    
    def evaluate_model(self, df, drug_id, test_periods=30):
        """Evaluate model performance"""
        # Split data into train/test
        drug_data = df[df['drugId'] == drug_id].copy()
        drug_data['date'] = pd.to_datetime(drug_data['saleDate'])
        drug_data = drug_data.groupby('date')['quantity'].sum().reset_index()
        
        split_date = drug_data['date'].max() - timedelta(days=test_periods)
        train_data = drug_data[drug_data['date'] <= split_date]
        test_data = drug_data[drug_data['date'] > split_date]
        
        # Train model on training data
        self.train_prophet_model(train_data, drug_id)
        
        # Make predictions
        predictions = self.predict_demand(drug_id, test_periods)
        
        # Calculate metrics
        mae = mean_absolute_error(test_data['quantity'], predictions['yhat'][-len(test_data):])
        rmse = np.sqrt(mean_squared_error(test_data['quantity'], predictions['yhat'][-len(test_data):]))
        
        return {
            'mae': mae,
            'rmse': rmse,
            'mape': np.mean(np.abs((test_data['quantity'] - predictions['yhat'][-len(test_data):]) / test_data['quantity'])) * 100
        }
API Service Implementation
# FastAPI service for demand forecasting
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict
import uvicorn
app = FastAPI(title="EasyManage Demand Forecasting API")
class ForecastRequest(BaseModel):
    drug_id: int
    periods: int = 30
    confidence_level: float = 0.95
class ForecastResponse(BaseModel):
    drug_id: int
    predictions: List[Dict]
    model_metrics: Dict
    last_updated: str
@app.post("/forecast/demand", response_model=ForecastResponse)
async def forecast_drug_demand(request: ForecastRequest):
    try:
        # Initialize forecaster
        forecaster = DrugDemandForecaster()
        
        # Get data from EasyManage
        extractor = EasyManageDataExtractor()
        sales_data = extractor.get_drug_sales_data()
        
        # Train model and make predictions
        forecaster.train_prophet_model(sales_data, request.drug_id)
        predictions = forecaster.predict_demand(request.drug_id, request.periods)
        
        # Evaluate model
        metrics = forecaster.evaluate_model(sales_data, request.drug_id)
        
        # Format response
        forecast_data = []
        for _, row in predictions.iterrows():
            forecast_data.append({
                "date": row['ds'].strftime('%Y-%m-%d'),
                "predicted_quantity": round(row['yhat'], 2),
                "lower_bound": round(row['yhat_lower'], 2),
                "upper_bound": round(row['yhat_upper'], 2)
            })
        
        return ForecastResponse(
            drug_id=request.drug_id,
            predictions=forecast_data,
            model_metrics=metrics,
            last_updated=datetime.now().isoformat()
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
@app.get("/drugs/active")
async def get_active_drugs():
    """Get list of active drugs for forecasting"""
    try:
        extractor = EasyManageDataExtractor()
        drugs_data = extractor.get_drug_sales_data()
        
        # Get unique active drugs
        active_drugs = drugs_data['drugId'].unique().tolist()
        
        return {
            "active_drugs": active_drugs,
            "total_count": len(active_drugs)
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
2. Medication Adherence Prediction System
Data Processing Pipeline
# Medication adherence feature engineering
class AdherenceFeatureEngineer:
    def __init__(self):
        self.feature_columns = [
            'patient_id', 'drug_id', 'days_since_prescription',
            'refill_frequency', 'adherence_score', 'risk_factors'
        ]
    
    def calculate_adherence_score(self, prescriptions_df, sales_df):
        """Calculate medication adherence score for each patient-drug combination"""
        adherence_data = []
        
        for _, prescription in prescriptions_df.iterrows():
            patient_id = prescription['patientId']
            drug_id = prescription['drugId']
            start_date = pd.to_datetime(prescription['startDate'])
            end_date = pd.to_datetime(prescription['endDate']) if prescription['endDate'] else None
            
            # Get sales data for this prescription
            patient_sales = sales_df[
                (sales_df['pid'] == patient_id) & 
                (sales_df['drugId'] == drug_id)
            ].copy()
            
            if len(patient_sales) == 0:
                continue
            
            # Calculate adherence metrics
            adherence_score = self._calculate_adherence_metrics(
                prescription, patient_sales, start_date, end_date
            )
            
            adherence_data.append({
                'patient_id': patient_id,
                'drug_id': drug_id,
                'adherence_score': adherence_score,
                'total_refills': len(patient_sales),
                'days_since_prescription': (datetime.now() - start_date).days,
                'prescription_duration': (end_date - start_date).days if end_date else None
            })
        
        return pd.DataFrame(adherence_data)
    
    def _calculate_adherence_metrics(self, prescription, sales, start_date, end_date):
        """Calculate adherence score based on refill patterns"""
        if len(sales) == 0:
            return 0.0
        
        # Expected refills based on prescription duration and quantity
        expected_refills = prescription.get('refills', 0)
        actual_refills = len(sales)
        
        # Calculate timing adherence
        sales_dates = pd.to_datetime(sales['saleDate']).sort_values()
        intervals = []
        
        for i in range(1, len(sales_dates)):
            interval = (sales_dates.iloc[i] - sales_dates.iloc[i-1]).days
            intervals.append(interval)
        
        if intervals:
            avg_interval = np.mean(intervals)
            expected_interval = prescription.get('interval', 30)  # Default 30 days
            
            # Calculate adherence score (0-100)
            refill_adherence = min(100, (actual_refills / max(expected_refills, 1)) * 100)
            timing_adherence = max(0, 100 - abs(avg_interval - expected_interval))
            
            adherence_score = (refill_adherence + timing_adherence) / 2
        else:
            adherence_score = 0.0
        
        return adherence_score
Adherence Prediction Model
# Machine learning model for adherence prediction
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib
class AdherencePredictor:
    def __init__(self):
        self.model = RandomForestClassifier(
            n_estimators=100,
            max_depth=10,
            random_state=42
        )
        self.scaler = StandardScaler()
        self.feature_names = []
    
    def prepare_features(self, adherence_df, patient_data, prescriptions_df):
        """Prepare features for adherence prediction"""
        # Merge patient demographics
        features_df = adherence_df.merge(
            patient_data[['id', 'age', 'sex', 'financial', 'race', 'ethnicity']],
            left_on='patient_id',
            right_on='id',
            how='left'
        )
        
        # Add prescription-specific features
        features_df = features_df.merge(
            prescriptions_df[['id', 'dosage', 'quantity', 'route', 'form']],
            left_on='prescription_id',
            right_on='id',
            how='left'
        )
        
        # Create categorical encodings
        features_df['sex_encoded'] = features_df['sex'].map({'M': 1, 'F': 0})
        features_df['financial_encoded'] = features_df['financial'].map({'1': 1, '0': 0})
        
        # Select final features
        feature_columns = [
            'adherence_score', 'total_refills', 'days_since_prescription',
            'age', 'sex_encoded', 'financial_encoded',
            'dosage', 'quantity'
        ]
        
        self.feature_names = feature_columns
        return features_df[feature_columns].fillna(0)
    
    def train_model(self, features_df, adherence_df):
        """Train the adherence prediction model"""
        # Create binary target (adherent vs non-adherent)
        target = (adherence_df['adherence_score'] >= 80).astype(int)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            features_df, target, test_size=0.2, random_state=42
        )
        
        # Scale features
        X_train_scaled = self.scaler.fit_transform(X_train)
        X_test_scaled = self.scaler.transform(X_test)
        
        # Train model
        self.model.fit(X_train_scaled, y_train)
        
        # Evaluate model
        train_score = self.model.score(X_train_scaled, y_train)
        test_score = self.model.score(X_test_scaled, y_test)
        
        return {
            'train_accuracy': train_score,
            'test_accuracy': test_score,
            'feature_importance': dict(zip(self.feature_names, self.model.feature_importances_))
        }
    
    def predict_adherence_risk(self, patient_features):
        """Predict adherence risk for a patient"""
        # Scale features
        features_scaled = self.scaler.transform([patient_features])
        
        # Make prediction
        prediction = self.model.predict(features_scaled)[0]
        probability = self.model.predict_proba(features_scaled)[0]
        
        return {
            'adherence_risk': 'High' if prediction == 0 else 'Low',
            'risk_probability': probability[0] if prediction == 0 else probability[1],
            'confidence': max(probability)
        }
    
    def save_model(self, filepath):
        """Save trained model"""
        model_data = {
            'model': self.model,
            'scaler': self.scaler,
            'feature_names': self.feature_names
        }
        joblib.dump(model_data, filepath)
    
    def load_model(self, filepath):
        """Load trained model"""
        model_data = joblib.load(filepath)
        self.model = model_data['model']
        self.scaler = model_data['scaler']
        self.feature_names = model_data['feature_names']
3. Revenue Prediction System
Revenue Analytics Engine
# Revenue prediction and analysis
class RevenuePredictor:
    def __init__(self):
        self.models = {}
        self.revenue_trends = {}
    
    def analyze_revenue_trends(self, billing_df, payments_df, drug_sales_df):
        """Analyze historical revenue trends"""
        # Aggregate revenue by date
        revenue_data = []
        
        # Billing revenue
        billing_revenue = billing_df.groupby('date')['fee'].sum().reset_index()
        billing_revenue['source'] = 'billing'
        revenue_data.append(billing_revenue)
        
        # Drug sales revenue
        drug_revenue = drug_sales_df.groupby('saleDate')['fee'].sum().reset_index()
        drug_revenue.columns = ['date', 'fee', 'source']
        drug_revenue['source'] = 'drug_sales'
        revenue_data.append(drug_revenue)
        
        # Combine all revenue sources
        total_revenue = pd.concat(revenue_data, ignore_index=True)
        total_revenue['date'] = pd.to_datetime(total_revenue['date'])
        
        # Daily revenue aggregation
        daily_revenue = total_revenue.groupby('date')['fee'].sum().reset_index()
        daily_revenue = daily_revenue.sort_values('date')
        
        # Calculate moving averages and trends
        daily_revenue['ma_7'] = daily_revenue['fee'].rolling(window=7).mean()
        daily_revenue['ma_30'] = daily_revenue['fee'].rolling(window=30).mean()
        daily_revenue['trend'] = daily_revenue['fee'].rolling(window=30).apply(
            lambda x: np.polyfit(range(len(x)), x, 1)[0]
        )
        
        self.revenue_trends = daily_revenue
        return daily_revenue
    
    def predict_revenue(self, periods=30, confidence_level=0.95):
        """Predict future revenue using time series analysis"""
        if self.revenue_trends.empty:
            raise ValueError("Revenue trends not analyzed. Run analyze_revenue_trends first.")
        
        # Use Prophet for revenue forecasting
        prophet_data = self.revenue_trends[['date', 'fee']].copy()
        prophet_data.columns = ['ds', 'y']
        
        model = Prophet(
            yearly_seasonality=True,
            weekly_seasonality=True,
            daily_seasonality=False,
            seasonality_mode='additive'
        )
        
        # Add custom seasonality for business cycles
        model.add_seasonality(name='monthly', period=30.5, fourier_order=5)
        model.add_seasonality(name='quarterly', period=91.25, fourier_order=3)
        
        model.fit(prophet_data)
        
        # Make predictions
        future = model.make_future_dataframe(periods=periods)
        forecast = model.predict(future)
        
        # Calculate confidence intervals
        z_score = 1.96 if confidence_level == 0.95 else 2.58  # 95% or 99% CI
        
        forecast['yhat_lower'] = forecast['yhat'] - z_score * forecast['yhat_std']
        forecast['yhat_upper'] = forecast['yhat'] + z_score * forecast['yhat_std']
        
        return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
    
    def calculate_revenue_metrics(self):
        """Calculate key revenue metrics"""
        if self.revenue_trends.empty:
            return {}
        
        current_revenue = self.revenue_trends['fee'].iloc[-1]
        avg_revenue = self.revenue_trends['fee'].mean()
        revenue_growth = self.revenue_trends['trend'].iloc[-1]
        
        # Calculate seasonality
        monthly_revenue = self.revenue_trends.groupby(
            self.revenue_trends['date'].dt.month
        )['fee'].mean()
        
        peak_month = monthly_revenue.idxmax()
        low_month = monthly_revenue.idxmin()
        
        return {
            'current_revenue': current_revenue,
            'average_revenue': avg_revenue,
            'revenue_growth_rate': revenue_growth,
            'peak_month': peak_month,
            'low_month': low_month,
            'revenue_volatility': self.revenue_trends['fee'].std(),
            'trend_direction': 'Increasing' if revenue_growth > 0 else 'Decreasing'
        }
Deployment & Monitoring
Docker Configuration
# Dockerfile for AI services
FROM python:3.9-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Expose port
EXPOSE 8000
# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
pandas==2.1.3
numpy==1.24.3
scikit-learn==1.3.2
prophet==1.1.4
requests==2.31.0
joblib==1.3.2
pydantic==2.5.0
python-multipart==0.0.6
Monitoring & Logging
# Monitoring and logging setup
import logging
from datetime import datetime
import json
class AIServiceMonitor:
    def __init__(self):
        self.logger = self._setup_logging()
        self.metrics = {}
    
    def _setup_logging(self):
        """Setup structured logging"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        return logging.getLogger(__name__)
    
    def log_prediction(self, model_name, input_data, prediction, execution_time):
        """Log prediction details for monitoring"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'model_name': model_name,
            'input_data': input_data,
            'prediction': prediction,
            'execution_time_ms': execution_time * 1000,
            'model_version': '1.0.0'
        }
        
        self.logger.info(f"Prediction made: {json.dumps(log_entry)}")
        
        # Store metrics
        if model_name not in self.metrics:
            self.metrics[model_name] = []
        
        self.metrics[model_name].append({
            'timestamp': datetime.now(),
            'execution_time': execution_time,
            'prediction_confidence': prediction.get('confidence', 0)
        })
    
    def get_model_performance(self, model_name):
        """Get performance metrics for a specific model"""
        if model_name not in self.metrics:
            return {}
        
        model_metrics = self.metrics[model_name]
        
        if not model_metrics:
            return {}
        
        execution_times = [m['execution_time'] for m in model_metrics]
        confidences = [m['prediction_confidence'] for m in model_metrics]
        
        return {
            'total_predictions': len(model_metrics),
            'avg_execution_time': np.mean(execution_times),
            'avg_confidence': np.mean(confidences),
            'last_prediction': model_metrics[-1]['timestamp'].isoformat()
        }
This implementation guide provides the technical foundation for deploying the three highest-priority AI workflows in the EasyManage system. Each workflow includes data processing, machine learning models, API services, and monitoring capabilities.