diff --git a/pom.xml b/pom.xml index e2e5696..af35711 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,6 @@ org.projectlombok lombok - true org.springframework.boot diff --git a/src/main/java/com/ddf/vodsystem/entities/Job.java b/src/main/java/com/ddf/vodsystem/entities/Job.java index 57b645c..3038af4 100644 --- a/src/main/java/com/ddf/vodsystem/entities/Job.java +++ b/src/main/java/com/ddf/vodsystem/entities/Job.java @@ -1,15 +1,11 @@ package com.ddf.vodsystem.entities; -import lombok.Data; import java.io.File; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.Data; @Data public class Job { - private static final Logger logger = LoggerFactory.getLogger(Job.class); - private String uuid; private File inputFile; private File outputFile; diff --git a/src/main/java/com/ddf/vodsystem/services/JobService.java b/src/main/java/com/ddf/vodsystem/services/JobService.java index e76b72f..cb0aa46 100644 --- a/src/main/java/com/ddf/vodsystem/services/JobService.java +++ b/src/main/java/com/ddf/vodsystem/services/JobService.java @@ -1,34 +1,54 @@ package com.ddf.vodsystem.services; +import java.io.IOException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + import com.ddf.vodsystem.entities.Job; import com.ddf.vodsystem.entities.JobStatus; import com.ddf.vodsystem.exceptions.JobNotFound; + import jakarta.annotation.PostConstruct; -import org.springframework.stereotype.Service; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; +/** + * Service for managing and processing jobs in a background thread. + * Uses a blocking queue to avoid busy waiting and ensures jobs are processed sequentially. + */ @Service public class JobService { private static final Logger logger = LoggerFactory.getLogger(JobService.class); - private final HashMap jobs = new HashMap<>(); - private final LinkedList jobQueue = new LinkedList<>(); + private final ConcurrentHashMap jobs = new ConcurrentHashMap<>(); + private final BlockingQueue jobQueue = new LinkedBlockingQueue<>(); private final CompressionService compressionService; + /** + * Constructs a JobService with the given CompressionService. + * @param compressionService the compression service to use for processing jobs + */ public JobService(CompressionService compressionService) { this.compressionService = compressionService; } + /** + * Adds a new job to the job map. + * @param job the job to add + */ public void add(Job job) { logger.info("Added job: {}", job.getUuid()); jobs.put(job.getUuid(), job); } + /** + * Retrieves a job by its UUID. + * @param uuid the UUID of the job + * @return the Job object + * @throws JobNotFound if the job does not exist + */ public Job getJob(String uuid) { Job job = jobs.get(uuid); @@ -39,6 +59,10 @@ public class JobService { return job; } + /** + * Marks a job as ready and adds it to the processing queue. + * @param uuid the UUID of the job to mark as ready + */ public void jobReady(String uuid) { Job job = getJob(uuid); job.setProgress(0f); @@ -48,28 +72,42 @@ public class JobService { jobQueue.add(job); } + /** + * Processes a job by running the compression service. + * @param job the job to process + */ + private void processJob(Job job) { + try { + compressionService.run(job); + } catch (IOException | InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Error while running job {}", job.getUuid(), e); + } + } + + /** + * Starts the background processing loop in a daemon thread. + * The loop blocks until a job is available and then processes it. + */ @PostConstruct private void startProcessingLoop() { Thread thread = new Thread(() -> { logger.info("Starting processing loop"); while (true) { - if (!jobQueue.isEmpty()) { - Job job = jobQueue.poll(); + try { + Job job = jobQueue.take(); // Blocks until a job is available logger.info("Starting job {}", job.getUuid()); - - try { - compressionService.run(job);// Execute the task - } catch (IOException | InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Error while running job {}", job.getUuid(), e); - } + processJob(job); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Processing loop interrupted", e); + break; } - } }); thread.setDaemon(true); thread.start(); } -} +} \ No newline at end of file