A simple in-memory job queue made for a job application.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

325 lines
7.6 KiB

package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"net/http"
"strconv"
"strings"
"time"
)
// Struct for data about Queued Jobs
type Queue []Job
type Job struct {
ID int // Job ID
Type string // Job Type (TIME_CRITICAL or NOT_TIME_CRITICAL)
Status string // Job status (QUEUED, IN_PROGRESS, CONCLUDED)
UserID int // The consumer that requested the job
AdditionalData string // Any additional data appeneded to the job
}
type QueueRequest struct {
Type string // Request Type (E,D,C,L,M) Enqueue, Dequeue, Conclude, List, More Details
ID int // The ID to act on (Used to conclude)
Status string // The job status to set (Enqueue)
JobType string // Job Type (Enqueue)
UserID int // The originating UserID
AdditionalData string // Any additional data (Enqueue)
}
type QueueResponse struct {
Type string // Response Type (L, I) List, Info
ResponseStatus int // Response Status (Action status: 0, 1, 2)
List []int // List of jobs
Job Job // Requested Job
}
var api = "/api/v1"
var port = ":8080"
var memQueue = make(Queue, 1, 9000)
var queueRequestChannel = make(chan QueueRequest, 1)
var queueResponseChannel = make(chan QueueResponse, 1)
var emptyJob = Job{0, "", "", 0, ""}
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
// Start the webserver on port 8080
go webServer()
// Begin waiting for requests
go queueManager()
// For loop for the program to not close
for {
time.Sleep(time.Millisecond)
}
}
/*
Queue Management
*/
func queueManager() {
// Wait for a request
go func() {
for {
req := <-queueRequestChannel
switch req.Type {
case "E":
job := enqueueJob(req)
queueResponseChannel <- QueueResponse{"I", 0, make([]int, 0), job}
case "D":
status := dequeueJob()
queueResponseChannel <- QueueResponse{"I", status, make([]int, 0), emptyJob}
case "C":
status := concludeJob(req.ID)
queueResponseChannel <- QueueResponse{"I", status, make([]int, 0), emptyJob}
case "L":
list, status := getJobsList()
queueResponseChannel <- QueueResponse{"L", status, list, emptyJob}
case "M":
job, err := getJobDetails(req.ID)
queueResponseChannel <- QueueResponse{"L", err, make([]int, 0), job}
}
}
}()
}
// Add a new job, returns job ID
func enqueueJob(req QueueRequest) Job {
id := int(time.Now().UnixNano())/(rand.Intn(10090)+1) + rand.Intn(1005)
if len(memQueue) == 1 && memQueue[0].ID == 0 {
memQueue[0] = Job{id, req.JobType, "QUEUED", req.UserID, req.AdditionalData}
} else {
memQueue = append(memQueue, Job{id, req.JobType, "QUEUED", req.UserID, req.AdditionalData})
}
return memQueue[0]
}
// Get the list of Jobs,
// returns jobs and status- 0: no issues, 1: job does not exist, 2: queue empty
func getJobsList() ([]int, int) {
jobs := make([]int, 0)
status := 2
// Check if there is a queue length
if len(memQueue) != 0 {
for i := range memQueue {
jobs = append(jobs, memQueue[i].ID)
status = 0
}
}
return jobs, status
}
// Get a specific Job's details,
// Returns Job and status- 0: no issues, 1: job does not exist, 2: queue empty
func getJobDetails(id int) (Job, int) {
var job Job
status := 2
if len(memQueue) != 0 {
for i := range memQueue {
if memQueue[i].ID == id {
job = memQueue[i]
status = 0
}
}
} else {
job = emptyJob
}
return job, status
}
// Dequeue a job,
// returns status 0: no issues, 1: job does not exist, 2: queue empty
func dequeueJob() int {
status := 1
if len(memQueue) != 0 {
p := rand.Intn(len(memQueue) - 1)
if memQueue[p].Status == "QUEUED" {
memQueue[p].Status = "IN_PROGRESS"
status = 0
} else {
fmt.Println("Requested dequeue job does not exist")
}
} else {
fmt.Println("Queue Empty, cannot dequeue")
}
return status
}
// Conclude a Job,
// returns status 0: no issues, 1: job does not exist, 2: queue empty, 3: job not dequeued
func concludeJob(id int) int {
status := 1
if len(memQueue) != 0 {
for i := range memQueue {
status = 1
if memQueue[i].ID == id && memQueue[i].Status != "CONCLUDED" {
memQueue[i].Status = "CONCLUDED"
status = 0
break
} else if memQueue[i].ID == id && memQueue[i].Status == "CONCLUDED" {
fmt.Println("Job already dequeued")
status = 0
break
} else if memQueue[i].ID == id {
fmt.Println("Job must be dequeued first")
status = 3
break
}
}
} else {
fmt.Println("Queue Empty, cannot conclude")
status = 2
}
return status
}
func toJSON(response QueueResponse) []byte {
jsonResponse, err := json.Marshal(response)
if err != nil {
fmt.Println(err)
}
return jsonResponse
}
func jobToJSON(response Job, status int) []byte {
if status != 0 {
fmt.Println("Job creation failure")
}
jsonResponse, err := json.Marshal(response)
if err != nil {
fmt.Println(err)
}
return jsonResponse
}
func fromJSON(jsonRequest []byte) QueueRequest {
var responseObject QueueRequest
err := json.Unmarshal(jsonRequest, &responseObject)
if err != nil {
fmt.Println(err)
}
return responseObject
}
/*
Integrated webserver, handles direct web requests
Forwards data to the Queue
Accepts string port (":8080") and string api ("/api/v1")
*/
func webServer() {
mux := http.NewServeMux()
mux.HandleFunc(api+"/", webHandler)
log.Fatal(http.ListenAndServe(port, mux))
}
// Handle API Endpoints
func webHandler(writer http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet || r.Method == "" {
handleGet(writer, r)
} else if r.Method == http.MethodPost || r.Method == http.MethodPut {
handlePush(writer, r)
}
}
// Handle HTTP POST and PUSH methods
func handlePush(writer http.ResponseWriter, r *http.Request) {
switch "/" + r.URL.Path[1:] {
case api + "/jobs/enqueue":
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Println(err)
}
queueRequestChannel <- fromJSON(body)
response := <-queueResponseChannel
fmt.Fprint(writer, string(toJSON(response)))
case api + "/jobs/dequeue":
queueRequestChannel <- QueueRequest{"D", 0, "", "", 0, ""}
response := <-queueResponseChannel
fmt.Fprint(writer, string(toJSON(response)))
default:
urlSlice := strings.Split(r.URL.Path, "/")
if urlSlice[3] == "jobs" {
if urlSlice[4] != "" {
b, err := strconv.ParseInt(urlSlice[4], 10, 0)
if err != nil {
fmt.Println("Not a valid int")
fmt.Println(urlSlice[4])
break
}
// Handle concluding a job, must be done here as the url path
// determines what to conclude, not the body
s := concludeJob(int(b))
response := QueueResponse{"I", s, make([]int, 0), emptyJob}
//defer r.Body.Close()
//body, err := ioutil.ReadAll(r.Body)
//if err != nil {
// fmt.Println(err)
//}
//queueRequestChannel <- fromJSON(body)
//response := <-queueResponseChannel
fmt.Fprint(writer, string(toJSON(response)))
}
}
}
}
// Handle HTTP GET methods (and default to GET when no method is passed)
func handleGet(writer http.ResponseWriter, r *http.Request) {
switch "/" + r.URL.Path[1:] {
case api + "/jobs":
queueRequestChannel <- QueueRequest{"L", 0, "", "", 0, ""}
response := <-queueResponseChannel
fmt.Fprint(writer, string(toJSON(response)))
default:
urlSlice := strings.Split(r.URL.Path[1:], "/")
if urlSlice[2] == "jobs" {
if len(urlSlice) == 4 {
fmt.Println(urlSlice)
jobID, err := strconv.ParseInt(urlSlice[3], 10, 0)
if err != nil {
fmt.Println("Not a valid int 4")
fmt.Println(err)
break
}
fmt.Fprint(writer, string(jobToJSON(getJobDetails(int(jobID)))))
}
}
}
}