Wecode uses a queue system to handle submission processing, ensuring fair resource allocation and preventing system overload.
Overview
The queue system:
- Serializes Processing: Limits concurrent submission evaluations
- Fair Scheduling: First-come, first-served processing
- Resource Management: Prevents server overload from too many simultaneous tests
- Process Tracking: Monitors which submissions are being processed
Queue Item Model
Each queue item represents a submission waiting to be or being processed:
protected $fillable = ['submission_id', 'type', 'processid'];
public function submission()
{
return $this->belongsTo('App\Models\Submission');
}
Fields
submission_id: The submission being processed
type: Type of processing (e.g., ‘judge’)
processid: Process ID when being processed, NULL when waiting
Queue Processing
Acquiring Queue Items
The acquire() method implements a mutex using database transactions:
public static function acquire($limit)
{
DB::beginTransaction(); // We use the queue table as a mutex, so this function must be atomic
$item = NULL;
if(Queue_item::whereNotNull('processid')->count() < $limit){
$item = Queue_item::whereNull('processid')
->with('submission.problem', 'submission.user', 'submission.language')
->oldest()
->lockForUpdate()
->first();
if ($item != NULL){
$item->processid = getmypid();
$item->save();
}
}
DB::commit();
return $item;
}
Key Features:
- Atomic Operation: Uses database transaction to prevent race conditions
- Concurrency Limit: Checks current processing count against limit
- Lock for Update: Prevents multiple processes from grabbing the same item
- Process Tracking: Sets
processid to current process ID
- FIFO Order: Uses
oldest() to get the earliest queued item
Adding Items to Queue
Add and Process:
public static function add_and_process($submit_id, $type){
Queue_item::create([
'submission_id' => $submit_id,
'type' => $type
]);
$a = Queue_item::work();
return $a;
}
Add Without Processing:
public static function add_not_process($submit_id, $type){
$a = Queue_item::create([
'submission_id' => $submit_id,
'type' => $type
]);
return $a;
}
Starting Queue Workers
public static function work(){
/* But this function here in case I need
changing the command parameter
*/
$a = shell_exec('php ' . escapeshellarg(base_path() . '/artisan'). ' work_queue >/dev/null 2>/dev/null &');
return $a;
}
This spawns a background artisan process to work through the queue.
Worker Command
The work_queue artisan command processes submissions:
public function handle()
{
$limit = Setting::get('concurent_queue_process', 2);
if ($this->option('force')) $limit = 999999;
$item = Queue_item::acquire($limit);
if ($item === NULL) {
// Queue is full, exit this process
var_dump("Exit casue no item");
exit;
}
do { // loop over queue items
// Process the submission...
$item->save_and_remove();
// Get next item from queue
$item = Queue_item::acquire($limit);
}while($item !== NULL);
var_dump("Exit, no more item");
}
Processing Flow
- Acquire Item: Get next unprocessed item from queue
- Test Submission: Run tester on the code
- Save Results: Update submission with score and status
- Update Final: Determine if this is the new final submission
- Remove from Queue: Delete the processed item
- Repeat: Get next item until queue is empty
Force Option
You can bypass the concurrency limit for debugging:
php artisan work_queue --force
Saving and Removing Items
When processing completes, the item is saved and removed:
public function save_and_remove(){
$submission = $this->submission;
DB::beginTransaction();
$final_sub = Submission::where([
'user_id' => $submission->user_id,
'assignment_id' => $submission->assignment_id,
'problem_id' => $submission->problem_id,
'is_final' => 1
])->lockForUpdate()->first();
if (
$final_sub == NULL
||
($final_sub->pre_score < $submission['pre_score']
|| $final_sub->pre_score * intval($final_sub->coefficient) < $submission['pre_score'] * intval($submission['coefficient'])
)
){
if ($final_sub){
$final_sub->is_final = 0;
$final_sub->save();
}
$submission->is_final = 1;
}
$submission->save();
DB::commit();
$this->delete();
Scoreboard::update_scoreboard($submission->assignment_id);
}
Final Submission Logic:
- Lock Current Final: Prevents race conditions when updating final submission
- Compare Scores: Checks if new score is better (considering late penalty coefficient)
- Update Final Flag: Marks new submission as final if it’s the best
- Update Scoreboard: Recalculates assignment scoreboard
Queue Controller
Administrators can manage the queue through web interface:
View Queue
public function index()
{
if ( ! in_array( Auth::user()->role->name, ['admin', 'head_instructor']) )
abort(404);
return view('admin.queue', ['queue' => Queue_item::with('submission')->latest()->get()] );
}
Start Workers
public function work(){
if ( ! in_array( Auth::user()->role->name, ['admin', 'head_instructor']) )
abort(404);
Queue_item::work();
return redirect(route('queue.index'));
}
Unlock Items
If a process crashes, items can get stuck with a processid. Unlock them:
public function unlock($item){
if ( ! in_array( Auth::user()->role->name, ['admin', 'head_instructor']) )
abort(403);
$item = Queue_item::find($item);
$item->processid = NULL;
$item->save();
return redirect(route('queue.index'));
}
Empty Queue
Emptying the queue will delete all pending submissions without processing them. Use with caution!
public function empty(){
if ( ! in_array( Auth::user()->role->name, ['admin', 'head_instructor']) )
abort(403);
Queue_item::truncate();
return redirect(route('queue.index'));
}
Routes
GET /queue - View current queue
POST /queue/work - Start queue workers
POST /queue/unlock/{item} - Unlock a stuck item
POST /queue/empty - Empty the entire queue
Configuration
Set the concurrent process limit in Settings:
Setting::set('concurent_queue_process', 2);
Default is 2 concurrent processes.
Best Practices
- Monitor Queue Size: Check queue regularly to ensure it’s processing
- Set Appropriate Limits: Balance between speed and server resources
- Handle Stuck Items: Unlock items that get stuck due to crashed processes
- Don’t Empty Unnecessarily: Only empty the queue in emergency situations
- Use Force Sparingly: The
--force option bypasses limits and can overload the server
Troubleshooting
Queue Not Processing
- Check if workers are running:
ps aux | grep work_queue
- Manually start workers from queue management page
- Check server logs for errors
- Verify tester path and permissions are correct
Items Stuck with processid
- Check if the process is still running:
ps -p {processid}
- If process is dead, unlock the item from admin interface
- Worker will pick up the unlocked item on next cycle
Queue Growing Too Fast
- Increase concurrent process limit
- Optimize test cases to run faster
- Add more server resources
- Consider reducing time limits if appropriate
Permissions: Only admins and head instructors can manage the queue. Regular users cannot view or modify queue items.