Flask-sqlalchemy
https://github.com/pallets/flask-sqlalchemy
https://courses.prettyprinted.com/courses/1016334/lectures/21156810
https://towardsdatascience.com/use-flask-and-sqlalchemy-not-flask-sqlalchemy-5a64fafe22a4
Why Flask-sqlalchemy
- Code Representation of database
- Database Agnostic
- Write code in python
- Integrates with Flask
- Automatically syncs
Concepts
- ORM
- Model
- Objects
- Query API
- Connection String
Example
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime, timedelta
from faker import Faker
import random
fake = Faker()
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite3'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
#db.init_app(app)
class Customer(db.Model):
id = db.Column(db.Integer, primary_key=True)
first_name = db.Column(db.String(50), nullable=False)
last_name = db.Column(db.String(50), nullable=False)
address = db.Column(db.String(500), nullable=False)
city = db.Column(db.String(50), nullable=False)
postcode = db.Column(db.String(50), nullable=False)
email = db.Column(db.String(50), nullable=False, unique=True)
orders = db.relationship('Order', backref='customer')
# Association Table (for many to many relationship)
# Multiple primary key means that it is a composite primary key, combination of both keys should be unique
order_product = db.Table('order_product',
db.Column('order_id', db.Integer, db.ForeignKey('order.id'), primary_key=True),
db.Column('product_id', db.Integer, db.ForeignKey('product.id'), primary_key=True)
)
class Order(db.Model):
id = db.Column(db.Integer, primary_key=True)
order_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow)
shipped_date = db.Column(db.DateTime)
delivered_date = db.Column(db.DateTime)
coupon_code = db.Column(db.String(50))
customer_id = db.Column(db.Integer, db.ForeignKey('customer.id'), nullable=False)
products = db.relationship('Product', secondary=order_product)
class Product(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), nullable=False, unique=True)
price = db.Column(db.Integer, nullable=False)
def add_customers():
for _ in range(100):
customer = Customer(
first_name=fake.first_name(),
last_name=fake.last_name(),
address=fake.street_address(),
city=fake.city(),
postcode=fake.postcode(),
email=fake.email()
)
db.session.add(customer)
db.session.commit()
def add_orders():
customers = Customer.query.all()
for _ in range(1000):
#choose a random customer
customer = random.choice(customers)
ordered_date = fake.date_time_this_year()
shipped_date = random.choices([None, fake.date_time_between(start_date=ordered_date)], [10, 90])[0]
#choose either random None or random date for delivered and shipped
delivered_date = None
if shipped_date:
delivered_date = random.choices([None, fake.date_time_between(start_date=shipped_date)], [50, 50])[0]
#choose either random None or one of three coupon codes
coupon_code = random.choices([None, '50OFF', 'FREESHIPPING', 'BUYONEGETONE'], [80, 5, 5, 5])[0]
order = Order(
customer_id=customer.id,
order_date=ordered_date,
shipped_date=shipped_date,
delivered_date=delivered_date,
coupon_code=coupon_code
)
db.session.add(order)
db.session.commit()
def add_products():
for _ in range(10):
product = Product(
name=fake.color_name(),
price=random.randint(10,100)
)
db.session.add(product)
db.session.commit()
def add_order_products():
orders = Order.query.all()
products = Product.query.all()
for order in orders:
#select random k
k = random.randint(1, 3)
# select random products
purchased_products = random.sample(products, k)
order.products.extend(purchased_products)
db.session.commit()
def create_random_data():
db.create_all()
add_customers()
add_orders()
add_products()
add_order_products()
def get_orders_by(customer_id=1):
print('Get Orders by Customer')
customer_orders = Order.query.filter_by(customer_id=customer_id).all()
for order in customer_orders:
print(order.order_date)
def get_pending_orders():
print('Pending Orders')
pending_orders = Order.query.filter(Order.shipped_date.is_(None)).order_by(Order.order_date.desc()).all()
for order in pending_orders:
print(order.order_date)
def how_many_customers():
print('How many customers?')
print(Customer.query.count())
def orders_with_code():
print('Orders with coupon code')
orders = Order.query.filter(Order.coupon_code.isnot(None)).filter(Order.coupon_code != 'FREESHIPPING').all()
for order in orders:
print(order.coupon_code)
def revenue_in_last_x_days(x_days=30):
print('Revenue past x days')
print(db.session
.query(db.func.sum(Product.price))
.join(order_product).join(Order)
.filter(Order.order_date > (datetime.now() - timedelta(days=x_days))
).scalar()
)
def average_fulfillment_time():
print('Average fulfillment time')
print(
db.session.query(
db.func.time(
db.func.avg(
db.func.strftime('%s', Order.shipped_date) - db.func.strftime('%s', Order.order_date)
),
'unixepoch'
)
).filter(Order.shipped_date.isnot(None)).scalar()
)
def get_customers_who_have_purchased_x_dollars(amount=500):
print('All customers who have purchased x dollars')
customers = db.session.query(Customer).join(Order).join(order_product).join(Product).group_by(Customer).having(db.func.sum(Product.price) > amount).all()
for customer in customers:
print(customer.first_name)