Skip to main content

AI Workflows - AI Implementation Guide

High Priority AI Workflows - Technical Implementation

1. Drug Demand Forecasting System

Data Flow Architecture

EasyManage APIs → Data Pipeline → Feature Store → ML Pipeline → Prediction Service → Dashboard

Data Sources & Preprocessing

# Data extraction from EasyManage APIs
import requests
import pandas as pd
from datetime import datetime, timedelta

class EasyManageDataExtractor:
def __init__(self, base_url="http://127.0.0.1:9080"):
self.base_url = base_url

def get_drug_sales_data(self, page=0, size=1000):
"""Extract drug sales data with pagination"""
url = f"{self.base_url}/emdbrest/drug_sales/ViewAllPaged"
params = {"page": page, "size": size}
response = requests.get(url, params=params)
return response.json()

def get_drug_inventory_data(self, page=0, size=1000):
"""Extract drug inventory data with pagination"""
url = f"{self.base_url}/emdbrest/drug_inventory/ViewAllPaged"
params = {"page": page, "size": size}
response = requests.get(url, params=params)
return response.json()

def get_prescriptions_data(self, page=0, size=1000):
"""Extract prescriptions data with pagination"""
url = f"{self.base_url}/emdbrest/prescriptions/ViewAllPaged"
params = {"page": page, "size": size}
response = requests.get(url, params=params)
return response.json()

# Feature engineering for demand forecasting
class DemandForecastingFeatures:
def __init__(self):
self.feature_columns = [
'drug_id', 'date', 'quantity_sold', 'day_of_week',
'month', 'quarter', 'is_holiday', 'stock_level',
'price_level', 'prescription_count'
]

def create_temporal_features(self, df):
"""Create time-based features"""
df['date'] = pd.to_datetime(df['saleDate'])
df['day_of_week'] = df['date'].dt.dayofweek
df['month'] = df['date'].dt.month
df['quarter'] = df['date'].dt.quarter
df['is_holiday'] = self._is_holiday(df['date'])
return df

def create_lag_features(self, df, lag_days=[1, 7, 30]):
"""Create lagged features for time series"""
for lag in lag_days:
df[f'quantity_lag_{lag}'] = df.groupby('drugId')['quantity'].shift(lag)
return df

def _is_holiday(self, dates):
"""Simple holiday detection (can be enhanced with holiday calendar)"""
# Implementation for major US holidays
holidays = ['2024-01-01', '2024-07-04', '2024-12-25'] # Example
return dates.dt.strftime('%Y-%m-%d').isin(holidays)

ML Model Implementation

# Demand forecasting model using Prophet
from prophet import Prophet
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error

class DrugDemandForecaster:
def __init__(self):
self.models = {}
self.feature_importance = {}

def prepare_prophet_data(self, df, drug_id):
"""Prepare data for Prophet model"""
drug_data = df[df['drugId'] == drug_id].copy()
drug_data = drug_data.groupby('date')['quantity'].sum().reset_index()
drug_data.columns = ['ds', 'y'] # Prophet requires 'ds' and 'y' columns
return drug_data

def train_prophet_model(self, df, drug_id):
"""Train Prophet model for specific drug"""
prophet_data = self.prepare_prophet_data(df, drug_id)

model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
seasonality_mode='multiplicative'
)

# Add custom seasonality if needed
model.add_seasonality(name='monthly', period=30.5, fourier_order=5)

model.fit(prophet_data)
self.models[drug_id] = model
return model

def predict_demand(self, drug_id, periods=30):
"""Predict demand for next N periods"""
if drug_id not in self.models:
raise ValueError(f"Model not trained for drug {drug_id}")

model = self.models[drug_id]
future = model.make_future_dataframe(periods=periods)
forecast = model.predict(future)

return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

def evaluate_model(self, df, drug_id, test_periods=30):
"""Evaluate model performance"""
# Split data into train/test
drug_data = df[df['drugId'] == drug_id].copy()
drug_data['date'] = pd.to_datetime(drug_data['saleDate'])
drug_data = drug_data.groupby('date')['quantity'].sum().reset_index()

split_date = drug_data['date'].max() - timedelta(days=test_periods)
train_data = drug_data[drug_data['date'] <= split_date]
test_data = drug_data[drug_data['date'] > split_date]

# Train model on training data
self.train_prophet_model(train_data, drug_id)

# Make predictions
predictions = self.predict_demand(drug_id, test_periods)

# Calculate metrics
mae = mean_absolute_error(test_data['quantity'], predictions['yhat'][-len(test_data):])
rmse = np.sqrt(mean_squared_error(test_data['quantity'], predictions['yhat'][-len(test_data):]))

return {
'mae': mae,
'rmse': rmse,
'mape': np.mean(np.abs((test_data['quantity'] - predictions['yhat'][-len(test_data):]) / test_data['quantity'])) * 100
}

API Service Implementation

# FastAPI service for demand forecasting
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Dict
import uvicorn

app = FastAPI(title="EasyManage Demand Forecasting API")

class ForecastRequest(BaseModel):
drug_id: int
periods: int = 30
confidence_level: float = 0.95

class ForecastResponse(BaseModel):
drug_id: int
predictions: List[Dict]
model_metrics: Dict
last_updated: str

@app.post("/forecast/demand", response_model=ForecastResponse)
async def forecast_drug_demand(request: ForecastRequest):
try:
# Initialize forecaster
forecaster = DrugDemandForecaster()

# Get data from EasyManage
extractor = EasyManageDataExtractor()
sales_data = extractor.get_drug_sales_data()

# Train model and make predictions
forecaster.train_prophet_model(sales_data, request.drug_id)
predictions = forecaster.predict_demand(request.drug_id, request.periods)

# Evaluate model
metrics = forecaster.evaluate_model(sales_data, request.drug_id)

# Format response
forecast_data = []
for _, row in predictions.iterrows():
forecast_data.append({
"date": row['ds'].strftime('%Y-%m-%d'),
"predicted_quantity": round(row['yhat'], 2),
"lower_bound": round(row['yhat_lower'], 2),
"upper_bound": round(row['yhat_upper'], 2)
})

return ForecastResponse(
drug_id=request.drug_id,
predictions=forecast_data,
model_metrics=metrics,
last_updated=datetime.now().isoformat()
)

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.get("/drugs/active")
async def get_active_drugs():
"""Get list of active drugs for forecasting"""
try:
extractor = EasyManageDataExtractor()
drugs_data = extractor.get_drug_sales_data()

# Get unique active drugs
active_drugs = drugs_data['drugId'].unique().tolist()

return {
"active_drugs": active_drugs,
"total_count": len(active_drugs)
}

except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

2. Medication Adherence Prediction System

Data Processing Pipeline

# Medication adherence feature engineering
class AdherenceFeatureEngineer:
def __init__(self):
self.feature_columns = [
'patient_id', 'drug_id', 'days_since_prescription',
'refill_frequency', 'adherence_score', 'risk_factors'
]

def calculate_adherence_score(self, prescriptions_df, sales_df):
"""Calculate medication adherence score for each patient-drug combination"""
adherence_data = []

for _, prescription in prescriptions_df.iterrows():
patient_id = prescription['patientId']
drug_id = prescription['drugId']
start_date = pd.to_datetime(prescription['startDate'])
end_date = pd.to_datetime(prescription['endDate']) if prescription['endDate'] else None

# Get sales data for this prescription
patient_sales = sales_df[
(sales_df['pid'] == patient_id) &
(sales_df['drugId'] == drug_id)
].copy()

if len(patient_sales) == 0:
continue

# Calculate adherence metrics
adherence_score = self._calculate_adherence_metrics(
prescription, patient_sales, start_date, end_date
)

adherence_data.append({
'patient_id': patient_id,
'drug_id': drug_id,
'adherence_score': adherence_score,
'total_refills': len(patient_sales),
'days_since_prescription': (datetime.now() - start_date).days,
'prescription_duration': (end_date - start_date).days if end_date else None
})

return pd.DataFrame(adherence_data)

def _calculate_adherence_metrics(self, prescription, sales, start_date, end_date):
"""Calculate adherence score based on refill patterns"""
if len(sales) == 0:
return 0.0

# Expected refills based on prescription duration and quantity
expected_refills = prescription.get('refills', 0)
actual_refills = len(sales)

# Calculate timing adherence
sales_dates = pd.to_datetime(sales['saleDate']).sort_values()
intervals = []

for i in range(1, len(sales_dates)):
interval = (sales_dates.iloc[i] - sales_dates.iloc[i-1]).days
intervals.append(interval)

if intervals:
avg_interval = np.mean(intervals)
expected_interval = prescription.get('interval', 30) # Default 30 days

# Calculate adherence score (0-100)
refill_adherence = min(100, (actual_refills / max(expected_refills, 1)) * 100)
timing_adherence = max(0, 100 - abs(avg_interval - expected_interval))

adherence_score = (refill_adherence + timing_adherence) / 2
else:
adherence_score = 0.0

return adherence_score

Adherence Prediction Model

# Machine learning model for adherence prediction
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib

class AdherencePredictor:
def __init__(self):
self.model = RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42
)
self.scaler = StandardScaler()
self.feature_names = []

def prepare_features(self, adherence_df, patient_data, prescriptions_df):
"""Prepare features for adherence prediction"""
# Merge patient demographics
features_df = adherence_df.merge(
patient_data[['id', 'age', 'sex', 'financial', 'race', 'ethnicity']],
left_on='patient_id',
right_on='id',
how='left'
)

# Add prescription-specific features
features_df = features_df.merge(
prescriptions_df[['id', 'dosage', 'quantity', 'route', 'form']],
left_on='prescription_id',
right_on='id',
how='left'
)

# Create categorical encodings
features_df['sex_encoded'] = features_df['sex'].map({'M': 1, 'F': 0})
features_df['financial_encoded'] = features_df['financial'].map({'1': 1, '0': 0})

# Select final features
feature_columns = [
'adherence_score', 'total_refills', 'days_since_prescription',
'age', 'sex_encoded', 'financial_encoded',
'dosage', 'quantity'
]

self.feature_names = feature_columns
return features_df[feature_columns].fillna(0)

def train_model(self, features_df, adherence_df):
"""Train the adherence prediction model"""
# Create binary target (adherent vs non-adherent)
target = (adherence_df['adherence_score'] >= 80).astype(int)

# Split data
X_train, X_test, y_train, y_test = train_test_split(
features_df, target, test_size=0.2, random_state=42
)

# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)

# Train model
self.model.fit(X_train_scaled, y_train)

# Evaluate model
train_score = self.model.score(X_train_scaled, y_train)
test_score = self.model.score(X_test_scaled, y_test)

return {
'train_accuracy': train_score,
'test_accuracy': test_score,
'feature_importance': dict(zip(self.feature_names, self.model.feature_importances_))
}

def predict_adherence_risk(self, patient_features):
"""Predict adherence risk for a patient"""
# Scale features
features_scaled = self.scaler.transform([patient_features])

# Make prediction
prediction = self.model.predict(features_scaled)[0]
probability = self.model.predict_proba(features_scaled)[0]

return {
'adherence_risk': 'High' if prediction == 0 else 'Low',
'risk_probability': probability[0] if prediction == 0 else probability[1],
'confidence': max(probability)
}

def save_model(self, filepath):
"""Save trained model"""
model_data = {
'model': self.model,
'scaler': self.scaler,
'feature_names': self.feature_names
}
joblib.dump(model_data, filepath)

def load_model(self, filepath):
"""Load trained model"""
model_data = joblib.load(filepath)
self.model = model_data['model']
self.scaler = model_data['scaler']
self.feature_names = model_data['feature_names']

3. Revenue Prediction System

Revenue Analytics Engine

# Revenue prediction and analysis
class RevenuePredictor:
def __init__(self):
self.models = {}
self.revenue_trends = {}

def analyze_revenue_trends(self, billing_df, payments_df, drug_sales_df):
"""Analyze historical revenue trends"""
# Aggregate revenue by date
revenue_data = []

# Billing revenue
billing_revenue = billing_df.groupby('date')['fee'].sum().reset_index()
billing_revenue['source'] = 'billing'
revenue_data.append(billing_revenue)

# Drug sales revenue
drug_revenue = drug_sales_df.groupby('saleDate')['fee'].sum().reset_index()
drug_revenue.columns = ['date', 'fee', 'source']
drug_revenue['source'] = 'drug_sales'
revenue_data.append(drug_revenue)

# Combine all revenue sources
total_revenue = pd.concat(revenue_data, ignore_index=True)
total_revenue['date'] = pd.to_datetime(total_revenue['date'])

# Daily revenue aggregation
daily_revenue = total_revenue.groupby('date')['fee'].sum().reset_index()
daily_revenue = daily_revenue.sort_values('date')

# Calculate moving averages and trends
daily_revenue['ma_7'] = daily_revenue['fee'].rolling(window=7).mean()
daily_revenue['ma_30'] = daily_revenue['fee'].rolling(window=30).mean()
daily_revenue['trend'] = daily_revenue['fee'].rolling(window=30).apply(
lambda x: np.polyfit(range(len(x)), x, 1)[0]
)

self.revenue_trends = daily_revenue
return daily_revenue

def predict_revenue(self, periods=30, confidence_level=0.95):
"""Predict future revenue using time series analysis"""
if self.revenue_trends.empty:
raise ValueError("Revenue trends not analyzed. Run analyze_revenue_trends first.")

# Use Prophet for revenue forecasting
prophet_data = self.revenue_trends[['date', 'fee']].copy()
prophet_data.columns = ['ds', 'y']

model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=False,
seasonality_mode='additive'
)

# Add custom seasonality for business cycles
model.add_seasonality(name='monthly', period=30.5, fourier_order=5)
model.add_seasonality(name='quarterly', period=91.25, fourier_order=3)

model.fit(prophet_data)

# Make predictions
future = model.make_future_dataframe(periods=periods)
forecast = model.predict(future)

# Calculate confidence intervals
z_score = 1.96 if confidence_level == 0.95 else 2.58 # 95% or 99% CI

forecast['yhat_lower'] = forecast['yhat'] - z_score * forecast['yhat_std']
forecast['yhat_upper'] = forecast['yhat'] + z_score * forecast['yhat_std']

return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

def calculate_revenue_metrics(self):
"""Calculate key revenue metrics"""
if self.revenue_trends.empty:
return {}

current_revenue = self.revenue_trends['fee'].iloc[-1]
avg_revenue = self.revenue_trends['fee'].mean()
revenue_growth = self.revenue_trends['trend'].iloc[-1]

# Calculate seasonality
monthly_revenue = self.revenue_trends.groupby(
self.revenue_trends['date'].dt.month
)['fee'].mean()

peak_month = monthly_revenue.idxmax()
low_month = monthly_revenue.idxmin()

return {
'current_revenue': current_revenue,
'average_revenue': avg_revenue,
'revenue_growth_rate': revenue_growth,
'peak_month': peak_month,
'low_month': low_month,
'revenue_volatility': self.revenue_trends['fee'].std(),
'trend_direction': 'Increasing' if revenue_growth > 0 else 'Decreasing'
}

Deployment & Monitoring

Docker Configuration

# Dockerfile for AI services
FROM python:3.9-slim

WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*

# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application code
COPY . .

# Expose port
EXPOSE 8000

# Run the application
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

Requirements.txt

fastapi==0.104.1
uvicorn==0.24.0
pandas==2.1.3
numpy==1.24.3
scikit-learn==1.3.2
prophet==1.1.4
requests==2.31.0
joblib==1.3.2
pydantic==2.5.0
python-multipart==0.0.6

Monitoring & Logging

# Monitoring and logging setup
import logging
from datetime import datetime
import json

class AIServiceMonitor:
def __init__(self):
self.logger = self._setup_logging()
self.metrics = {}

def _setup_logging(self):
"""Setup structured logging"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
return logging.getLogger(__name__)

def log_prediction(self, model_name, input_data, prediction, execution_time):
"""Log prediction details for monitoring"""
log_entry = {
'timestamp': datetime.now().isoformat(),
'model_name': model_name,
'input_data': input_data,
'prediction': prediction,
'execution_time_ms': execution_time * 1000,
'model_version': '1.0.0'
}

self.logger.info(f"Prediction made: {json.dumps(log_entry)}")

# Store metrics
if model_name not in self.metrics:
self.metrics[model_name] = []

self.metrics[model_name].append({
'timestamp': datetime.now(),
'execution_time': execution_time,
'prediction_confidence': prediction.get('confidence', 0)
})

def get_model_performance(self, model_name):
"""Get performance metrics for a specific model"""
if model_name not in self.metrics:
return {}

model_metrics = self.metrics[model_name]

if not model_metrics:
return {}

execution_times = [m['execution_time'] for m in model_metrics]
confidences = [m['prediction_confidence'] for m in model_metrics]

return {
'total_predictions': len(model_metrics),
'avg_execution_time': np.mean(execution_times),
'avg_confidence': np.mean(confidences),
'last_prediction': model_metrics[-1]['timestamp'].isoformat()
}

This implementation guide provides the technical foundation for deploying the three highest-priority AI workflows in the EasyManage system. Each workflow includes data processing, machine learning models, API services, and monitoring capabilities.