danixfs/danix/db/models.py

302 lines
12 KiB
Python

from django.db import models
from django.db.models import Q
import uuid
from danixfs import Danix
from datetime import datetime
from settings import SNAPSHOT_LIMIT
from utils import is_unique_database_tuple, get_message, check_equal_sentence, check_not_equal_sentence
from django.core.exceptions import ValidationError
def get_queryset_filtered(model, sub_attribute):
if model == Environment:
return model.objects.filter(filesystem_name__startswith=sub_attribute)
return model.objects.filter(snapshot__startswith=sub_attribute)
class Environment(models.Model):
filesystem_name = models.UUIDField()
status = models.BooleanField(default=True)
name = models.TextField()
created = models.DateField(default=datetime.now())
template = models.TextField(default="")
@staticmethod
def navigate(filesystem_name):
try:
environment = get_queryset_filtered(Environment, filesystem_name)
environments_count = environment.count()
environment_first = environment.first()
if check_equal_sentence(environments_count, 0):
get_message(message=settings.ENV_NOT_FOUND, is_finishprogram=True, finish_status_code=1)
expression = is_unique_database_tuple(environment) and environment_first.status
if expression:
resp = Danix.navigate(environment_first.filesystem_name)
if not check_not_equal_sentence(resp, 0):
get_message(
message=settings.ENV_GENERIC_ERROR,
is_finishprogram=True,
finish_status_code=1
)
elif expression ^ (not environment.status):
get_message(
message=settings.ENV_STOPPED_ERROR,
is_finishprogram=True,
finish_status_code=1
)
else:
get_message(
message=settings.ENV_PATTERN_ERROR,
is_finishprogram=True,
finish_status_code=1
)
except ValidationError:
get_message(
message=settings.ENV_PATTERN_ERROR,
is_finishprogram=True,
finish_status_code=1
)
@staticmethod
def rm_environment(filesystem_name):
try:
environment = Environment.objects.filter(filesystem_name=filesystem_name)
environment_count = environment.count()
environment = environment.first()
if check_equal_sentence(environment_count, 0):
get_message(
message=settings.ENV_NOT_FOUND,
is_finishprogram=True,
finish_status_code=1
)
Danix.rm(filesystem_name)
environment.delete()
get_message(
message=f"{settings.ENV_REMOVED} - {filesystem_name}",
is_finishprogram=False,
finish_status_code=-1
)
except Exception:
get_message(
message=settings.ENV_NOT_FOUND,
is_finishprogram=True,
finish_status_code=1
)
@staticmethod
def start_environment(filesystem_name):
environment = Environment.objects.filter(filesystem_name=filesystem_name)
environment_counter = environment.count()
environment = environment.first()
if check_equal_sentence(environment_counter, 0):
get_message(
message=settings.ENV_NOT_FOUND,
is_finishprogram=True,
finish_status_code=1
)
environment.status = True
environment.save()
get_message(
message=settings.ENV_STARTED,
is_finishprogram=True,
finish_status_code=1
)
@staticmethod
def set_active(filesystem_name):
environment = Environment.objects.filter(filesystem_name=filesystem_name).first()
environment.status = True
environment.save()
@staticmethod
def stop_environment(filesystem_name):
environment = Environment.objects.filter(filesystem_name=filesystem_name)
environment_counter = environment.count()
environment = environment.first()
if check_equal_sentence(environment_counter, 0):
get_message(message=settings.ENV_NOT_FOUND, is_finishprogram=True, finish_status_code=1)
environment.status = False
environment.save()
get_message(message=settings.ENV_STOPPED, is_finishprogram=True, finish_status_code=1)
@staticmethod
def list_environments():
environments = Environment.objects.all()
environment_counter = environments.count()
print("===============================================================================================================================================")
print("| ENVIRONMENT NAME | TEMPLATE | CREATED | SUBSYSTEM NAME | IMAGE | STATUS | SIZE |")
print("|=============================================================================================================================================|")
if environment_counter > 0:
for environment in environments:
name = str(environment.name)
status_icon = "🟢 Running" if environment.status else "🔴 Stopped"
template = str(environment.template)
size = Danix.get_size(environment.filesystem_name, None)
print(f"| {name[0:11]}{(11-len(name)) * '.'} {template}{(6-len(template)) * ' '} {environment.created} {environment.filesystem_name} Alpine {status_icon} {size}B |")
print("===============================================================================================================================================")
class Meta:
db_table = "environment"
class Snapshot(models.Model):
snapshot_name = models.UUIDField()
environment_id = models.ForeignKey(Environment, null=True, on_delete=models.SET_NULL)
created = models.DateField(default=datetime.now())
last = models.BooleanField(default=True)
@staticmethod
def rm_snapshot(snapshot_name):
try:
snapshot = Snapshot.objects.filter(snapshot_name=snapshot_name)
snapshot_counter = snapshot.count()
if snapshot_counter > 0:
resp = Danix.remove_snapshot(snapshot_name)
if check_equal_sentence(resp, 0):
Snapshot.objects.get(snapshot_name=snapshot_name).delete()
get_message(message=f"Snapshot removed successfully - {snapshot_name}", is_finishprogram=False, finish_status_code=-1)
get_message(message="Error: Snapshot can not remove", is_finishprogram=True, finish_status_code=1)
get_message(message="Snapshot does not exist!", is_finishprogram=True, finish_status_code=1)
except ValidationError:
get_message(message="Snapshot does not exist!", is_finishprogram=True, finish_status_code=1)
@staticmethod
def back_snapshot(snapshot_name):
try:
snapshot = Snapshot.objects.filter(snapshot_name=snapshot_name)
if snapshot.count() > 0:
filesystem_name = Environment.objects.filter(id=snapshot.first().environment_id.id).first().filesystem_name
resp = Danix.back_snapshot(filesystem_name, snapshot_name)
if check_equal_sentence(resp, 0):
get_message(message="Snapshot roll back successfully!", is_finishprogram=True, finish_status_code=0)
get_message(message="Error: Snapshot can not back", is_finishprogram=True, finish_status_code=1)
get_message(message="Snapshot does not exist!", is_finishprogram=True, finish_status_code=1)
except ValidationError:
get_message(message="Snapshot does not exist!", is_finishprogram=True, finish_status_code=1)
@staticmethod
def create(subsystem_name):
try:
environment = Environment.objects.get(filesystem_name=subsystem_name)
environment_id = environment.id
snapshots = Snapshot.objects.filter(environment_id=environment_id)
if snapshots.count() >= int(SNAPSHOT_LIMIT):
get_message(message="Snapshot limit exceeded! Please remove 1 snapshot to continue!", is_finishprogram=True, finish_status_code=1)
else:
for snapshot in snapshots:
snapshot.last = False
snapshot.save()
snapshot_name = uuid.uuid4()
print('Wait a minute! Taking snapshot')
snapshot = Snapshot.objects.create(snapshot_name=snapshot_name,environment_id=environment).save()
resp = Danix.make_snapshot(subsystem_name, snapshot_name)
if check_equal_sentence(resp, 0):
print("Snapshot created successfully")
print(f"Snapshot name {snapshot_name}\n")
print(f"======================================")
print(f"Environment size: {Danix.get_size(subsystem_name, None)}B")
print(f"Snapshot size: {Danix.get_size(subsystem_name, snapshot_name)}B")
print(f"======================================")
exit(0)
else:
Snapshot.objects.get(snapshot_name=snapshot_name,environment_id=environment).delete()
print("Snapshot create error!")
exit(1)
except Exception:
get_message(message="Snapshot create error: Environment does not exist!", is_finishprogram=True, finish_status_code=1)
@staticmethod
def list_snapshots():
snapshots = Snapshot.objects.all()
print("===========================================================================================================================================")
print("| SNAPSHOT NAME | ENVIRONMENT NAME | CREATED | LAST SNAPSHOT | SIZE |")
print("|==========================================================================================================================================|")
if snapshots.count() > 0:
for snapshot in snapshots:
name = str(snapshot.snapshot_name)
lastsnapshot_icon = "🟢 Yes" if snapshot.last else "🟠 No "
if snapshot.environment_id:
environment_name = Environment.objects.filter(id=snapshot.environment_id.id).first().filesystem_name
size = Danix.get_size(environment_name, name)
else:
environment_name = f'Environment Removed 🔴{14*" "}'
size = "---"
print(f"| {name} {environment_name} {snapshot.created} {lastsnapshot_icon} {size}B |")
print("===========================================================================================================================================")
class Meta:
db_table = "snapshot"
# Create your models here.