You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

445 lines
14 KiB

import calendar, datetime
from datetime import timedelta
from decimal import Decimal
from itertools import groupby
from django.shortcuts import render
from django.contrib.auth.models import User, Group
from django.core import exceptions
from django.db.models import Q
from caremyway.api.models import UserInfo, Client, Provider, WorkType, Manage, Price, Shift
from rest_framework import viewsets, permissions, status, serializers
from rest_framework.decorators import api_view
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.renderers import StaticHTMLRenderer
from caremyway.api.serializers import UserSerializer, UserInfoSerializer, ClientSerializer, ProviderSerializer, WorkTypeSerializer, EmployeeSerializer, EmployerSerializer, PriceSerializer, CShiftSerializer, PShiftSerializer, TimeSheetSerializer, RecordSheetSerializer
class UserViewSet(viewsets.ModelViewSet):
lookup_field = 'username'
serializer_class = UserSerializer
# Disallow POSTing from /users/ route. Only rest-auth can add users.
http_method_names = ['get', 'head', 'put', 'options']
def get_queryset(self):
return User.objects.filter(username=self.request.user)
class UserInfoViewSet(viewsets.ModelViewSet):
lookup_field = 'user__username'
serializer_class = UserInfoSerializer
# Disallow DELETE
http_method_names = ['get', 'post', 'head', 'put', 'options']
def get_queryset(self):
return UserInfo.objects.filter(user__username=self.request.user)
class ClientViewSet(viewsets.ModelViewSet):
lookup_field = 'user__username'
serializer_class = ClientSerializer
# Disallow DELETE
http_method_names = ['get', 'post', 'head', 'put', 'options']
def get_queryset(self):
return Client.objects.filter(user__username=self.request.user)
class ProviderViewSet(viewsets.ModelViewSet):
lookup_field = 'user__username'
serializer_class = ProviderSerializer
# Disallow DELETE
http_method_names = ['get', 'post', 'head', 'put', 'options']
def get_queryset(self):
return Provider.objects.filter(user__username=self.request.user)
class WorkTypeViewSet(viewsets.ModelViewSet):
lookup_field = 'uuid'
serializer_class = WorkTypeSerializer
def get_queryset(self):
return WorkType.objects.filter(client__user__username=self.request.user)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
instance.deleted = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
class EmployeeViewSet(viewsets.ModelViewSet):
lookup_field = 'uuid'
serializer_class = EmployeeSerializer
def get_queryset(self):
return Manage.objects.filter(client__user__username=self.request.user)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
instance.deleted = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
class EmployerViewSet(viewsets.ModelViewSet):
lookup_field = 'uuid'
serializer_class = EmployerSerializer
# Disallow creation and deletions of relationships
http_method_names = ['get', 'head', 'put', 'options']
def get_queryset(self):
return Manage.objects.filter(provider__user__username=self.request.user)
class PriceViewSet(viewsets.ModelViewSet):
lookup_field = 'uuid'
serializer_class = PriceSerializer
def get_queryset(self):
return Price.objects.filter(management__client__user__username=self.request.user)
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
instance.deleted = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
class CShiftViewSet(viewsets.ModelViewSet):
lookup_field = 'uuid'
serializer_class = CShiftSerializer
def get_queryset(self):
return Shift.objects.filter(deleted=False) \
.filter(price__management__client__user__username=self.request.user) \
.order_by('-set_start')
def destroy(self, request, *args, **kwargs):
instance = self.get_object()
if instance.actual_start:
return Response("Shift already started.", status=status.HTTP_400_BAD_REQUEST)
instance.deleted = True
instance.save()
return Response(status=status.HTTP_204_NO_CONTENT)
class PShiftViewSet(viewsets.ModelViewSet):
lookup_field = 'uuid'
serializer_class = PShiftSerializer
# Disallow creation and deletions of relationships
http_method_names = ['get', 'head', 'put', 'options']
def get_queryset(self):
return Shift.objects.filter(deleted=False) \
.filter(price__management__provider__user__username=self.request.user) \
.order_by('-set_start')
def validate_param(value, field):
class ValidateParam(serializers.Serializer):
value = field
obj = ValidateParam(data={'value': value})
if obj.is_valid():
return obj.validated_data.get('value');
else:
raise serializers.ValidationError(obj.errors['value'])
def get_paystart(payday):
# Assumes payday is a valided date obj
if payday.day == 15:
paystart_day = 1
elif payday.day == calendar.monthrange(payday.year, payday.month)[1]:
paystart_day = 16
else:
raise serializers.ValidationError("Date is not a valid payday.")
return datetime.date(payday.year, payday.month, paystart_day) - timedelta(days=4)
def get_payend(payday):
return payday - timedelta(days=4)
def gen_timesheets(user, payday, manage=None):
paystart = get_paystart(payday)
payend = get_payend(payday)
timesheets = []
shifts = Shift.objects.filter(deleted=False) \
.exclude(actual_end__isnull=True) \
.filter(
Q(price__management__client__user__username=user)
| Q(price__management__provider__user__username=user)) \
.filter(set_date__gte=paystart) \
.filter(set_date__lte=payend) \
.order_by('price__management', 'actual_start')
if manage:
shifts = shifts.filter(price__management__uuid=manage)
manage_shifts = groupby(shifts, lambda x: x.price.management)
for management, shifts in manage_shifts:
timesheet = {}
timesheet['management'] = management
timesheet['client'] = management.client
timesheet['provider'] = management.provider
timesheet['payday'] = payday
timesheet['paystart'] = paystart
timesheet['payend'] = payend
timesheet['shifts'] = []
for s in shifts:
shift = {}
shift['date'] = s.set_date
shift['start'] = s.actual_start
shift['end'] = s.actual_end
shift['hours'] = (shift['end'] - shift['start']).total_seconds() / 3600
shift['rate'] = s.set_price
shift['amount'] = Decimal(shift['hours']) * shift['rate']
shift['work'] = s.price.work_type.label
timesheet['shifts'].append(shift)
timesheets.append(timesheet)
return timesheets
def get_paydays_todate(payday):
# Assumes payday is a valided date obj
year = payday.year
paydays = []
if payday.day == 15:
paydays.append(payday)
elif payday.day == calendar.monthrange(payday.year, payday.month)[1]:
paydays.append(payday)
paydays.append(datetime.date(year, payday.month, 15))
else:
raise serializers.ValidationError("Date is not a valid payday.")
for month in reversed(range(1, payday.month)):
end_of_month = calendar.monthrange(year, month)[1]
paydays.append(datetime.date(year, month, end_of_month))
paydays.append(datetime.date(year, month, 15))
return list(reversed(paydays))
def get_cpp(cpp_ytd, earnings):
D = cpp_ytd
PI = earnings
P = 24
C_i = Decimal(2564.10) - D
C_ii = Decimal(0.0495) * (PI - Decimal(3500 / P))
C = min(C_i, C_ii)
if C < 0:
C = 0
return C
def get_ei(ei_ytd, earnings):
D1 = ei_ytd
IE = earnings
EI_i = Decimal(836.19) - D1
EI_ii = Decimal(0.0163) * IE
EI = min(EI_i, EI_ii)
if EI < 0:
EI = 0
return EI
def get_taxable_income(earnings):
P = 24
I = earnings
F = 0
F2 = 0
U1 = 0
HD = 0
F1 = 0
A = (P * (I - F - F2 - U1)) - HD - F1
return A
def get_fed_tax(taxable_income, cpp, ei, cpp_ytd, ei_ytd):
table = [(0, 0.150, 0),
(45916, 0.205, 2525),
(91831, 0.260, 7576),
(142352, 0.290, 11847),
(202800, 0.330, 19959)]
A = taxable_income
R = Decimal([row for row in table if row[0] < A][-1][1])
K = [row for row in table if row[0] < A][-1][2]
TC = 11635
K1 = Decimal(0.15 * TC)
P = 24
if cpp_ytd + cpp >= 2564.10:
PxC = Decimal(2564.10)
else:
C = cpp
PxC = P * C
if ei_ytd + ei >= 836.19:
PxEI = Decimal(836.19)
else:
EI = ei
PxEI = P * EI
K2 = (Decimal(0.15) * PxC) + (Decimal(0.15) * PxEI)
K3 = 0
K4_i = Decimal(0.15) * A
K4_ii = Decimal(0.15 * 1178)
K4 = min(K4_i, K4_ii)
T3 = (R * A) - K - K1 - K2 - K3 - K4
if T3 < 0:
T3 = 0
return T3
def get_fed_tax_payable(fed_tax):
T3 = fed_tax
LCF = 0
T1 = T3 - LCF
if T1 < 0:
T1 = 0
return T1
def get_prov_tax(taxable_income, cpp, ei, cpp_ytd, ei_ytd):
table = [(0, 0.10, 0),
(126625, 0.12, 2533),
(151950, 0.13, 4052),
(202600, 0.14, 6078),
(303900, 0.15, 9117)]
A = taxable_income
V = Decimal([row for row in table if row[0] < A][-1][1])
KP = [row for row in table if row[0] < A][-1][2]
TCP = 18690
K1P = Decimal(0.1 * TCP)
P = 24
if cpp_ytd + cpp >= 2564.10:
PxC = Decimal(2564.10)
else:
C = cpp
PxC = P * C
if ei_ytd + ei >= 836.19:
PxEI = Decimal(836.19)
else:
EI = ei
PxEI = P * EI
K2P = (Decimal(0.10) * PxC) + (Decimal(0.10) * PxEI)
K3P = 0
T4 = (V * A) - KP - K1P - K2P - K3P
if T4 < 0:
T4 = 0
return T4
def get_prov_tax_deduction(prov_tax):
T4 = prov_tax
V1 = 0
S = 0
LCP = 0
T2 = T4 - V1 - S - LCP
if T2 < 0:
T2 = 0
return T2
def get_income_tax(fed_tax_payable, prov_tax_deduction):
T1 = fed_tax_payable
T2 = prov_tax_deduction
P = 24
L = 0
T = Decimal((T1 + T2) / P) + L
return T
def gen_recordsheets(user, payday, manage):
paydays = get_paydays_todate(payday)
recordsheets = []
flat_records = []
for payday in paydays:
timesheets = gen_timesheets(user, payday, manage)
for timesheet in timesheets:
record = {}
record['payday'] = payday
record['management'] = timesheet['management']
record['reg_pay'] = sum(shift['amount'] for shift in timesheet['shifts'])
record['vac_pay'] = record['reg_pay'] * Decimal(0.04)
record['earnings'] = record['reg_pay'] + record['vac_pay']
record['cpp_ytd'] = sum(r['cpp'] for r in flat_records if r['management'] == timesheet['management'])
record['cpp'] = get_cpp(record['cpp_ytd'], record['earnings'])
record['ei_ytd'] = sum(r['ei'] for r in flat_records if r['management'] == timesheet['management'])
record['ei'] = get_ei(record['ei_ytd'], record['earnings'])
record['taxable_income'] = get_taxable_income(record['earnings'])
record['fed_tax'] = get_fed_tax(record['taxable_income'], record['cpp'], record['ei'], record['cpp_ytd'], record['ei_ytd'])
record['fed_tax_payable'] = get_fed_tax_payable(record['fed_tax'])
record['prov_tax'] = get_prov_tax(record['taxable_income'], record['cpp'], record['ei'], record['cpp_ytd'], record['ei_ytd'])
record['prov_tax_deduction'] = get_prov_tax_deduction(record['prov_tax'])
record['income_tax'] = get_income_tax(record['fed_tax_payable'], record['prov_tax_deduction'])
record['total_deductions'] = record['income_tax'] + record['cpp'] + record['ei']
record['net_pay'] = record['earnings'] - record['total_deductions']
flat_records.append(record)
flat_records = sorted(flat_records, key=lambda x: x['management'].uuid)
grouped_records = groupby(flat_records, lambda x: x['management'])
for management, record in grouped_records:
recordsheet = {}
recordsheet['management'] = management
recordsheet['client'] = management.client
recordsheet['provider'] = management.provider
recordsheet['phone_number'] = management.provider.user.userinfo.phone_number
recordsheet['sin'] = management.provider.sin
recordsheet['record'] = list(record)
recordsheets.append(recordsheet)
return list(recordsheets)
class ReportView(APIView):
def get(self, request, *args, **kwargs):
response = []
report_type = kwargs['type']
qp = request.query_params
if report_type == 'timesheet':
payday = validate_param(qp.get('payday'), serializers.DateField())
manage = validate_param(qp.get('manage'), serializers.UUIDField(allow_null=True))
timesheets = gen_timesheets(request.user, payday, manage)
response = TimeSheetSerializer(timesheets, many=True).data
elif report_type == 'record':
payday = validate_param(qp.get('payday'), serializers.DateField())
manage = validate_param(qp.get('manage'), serializers.UUIDField(allow_null=True))
recordsheets = gen_recordsheets(request.user, payday, manage)
response = RecordSheetSerializer(recordsheets, many=True).data
return Response(response, status=status.HTTP_200_OK)
@api_view()
def null_view(request):
return Response(status=status.HTTP_400_BAD_REQUEST)