00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include
00020 #include
00021 #include
00022 #include
00023 #include
00024 #include
00025 #include
00026 #include
00027 #include
00028 #ifdef WIN32
00029 #include
00030 #endif
00031 #include "nntpgrab_plugin_schedular.h"
00032 #include "schedular_plugin.h"
00033
00034 typedef struct SchedularPluginClass SchedularPluginClass;
00035
00036 static SchedularPlugin *schedular;
00037
00038 struct SchedularPluginClass
00039 {
00040 GObjectClass parent;
00041 };
00042
00043 G_DEFINE_TYPE(SchedularPlugin, schedular_plugin, G_TYPE_OBJECT);
00044
00045 gboolean
00046 nntpgrab_plugin_schedular_initialize(PluginSchedularImportedFuncs funcs, char **errmsg)
00047 {
00048 schedular_plugin_set_imported_funcs(funcs);
00049
00050 schedular = g_object_new(SCHEDULAR_PLUGIN_TYPE_OBJECT, NULL);
00051
00052 if (!load_download_queue(&schedular->queue, errmsg)) {
00053 g_object_unref(schedular);
00054
00055 return FALSE;
00056 }
00057
00058 return TRUE;
00059 }
00060
00061 void
00062 nntpgrab_plugin_schedular_destroy(void)
00063 {
00064 g_object_unref(schedular);
00065 }
00066
00067 int
00068 nntpgrab_plugin_schedular_get_version(void)
00069 {
00070 return NNTPGRAB_PLUGIN_API_VERSION;
00071 }
00072
00073 static void
00074 schedular_plugin_init (SchedularPlugin *obj)
00075 {
00076 g_static_mutex_init(&obj->mutex);
00077 g_static_mutex_init(&obj->runtime_mutex);
00078
00079 obj->thread = NULL;
00080 obj->abort_flag = FALSE;
00081 obj->queue = NULL;
00082 }
00083
00084 static void
00085 schedular_plugin_finalize (GObject *obj)
00086 {
00087 SchedularPlugin *schedular = SCHEDULAR_PLUGIN(obj);
00088 GList *list;
00089
00090 g_static_mutex_lock(&schedular->runtime_mutex);
00091
00092 if (schedular->thread) {
00093 g_static_mutex_lock(&schedular->mutex);
00094 schedular->abort_flag = TRUE;
00095 g_static_mutex_unlock(&schedular->mutex);
00096
00097 g_thread_join(schedular->thread);
00098 }
00099
00100 g_static_mutex_unlock(&schedular->runtime_mutex);
00101
00102 g_static_mutex_lock(&schedular->mutex);
00103
00104
00105 list = schedular->servers;
00106 while (list) {
00107 g_slice_free(ConfigServer, list->data);
00108 list = g_list_next(list);
00109 }
00110 g_list_free(schedular->servers);
00111 schedular->servers = NULL;
00112
00113 save_download_queue(schedular->queue, NULL);
00114
00115 free_download_queue(schedular->queue);
00116
00117 g_static_mutex_unlock(&schedular->mutex);
00118
00119 g_static_mutex_free(&schedular->mutex);
00120 g_static_mutex_free(&schedular->runtime_mutex);
00121 }
00122
00123 static void
00124 schedular_plugin_class_init (SchedularPluginClass *klass)
00125 {
00126 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
00127
00128 gobject_class->finalize = schedular_plugin_finalize;
00129 }
00130
00131 struct _part_download_done {
00132 NNTPCollection *collection;
00133 NNTPFile *file;
00134 NNTPPart *part;
00135 };
00136
00137 gboolean
00138 get_next_part_to_download(SchedularPlugin *schedular, int server_id, NNTPCollection **collection, NNTPFile **file, NNTPPart **part, GThreadPool *poolDecoder)
00139 {
00140 struct _part_download_done *job;
00141
00142 job = g_async_queue_try_pop(schedular->task_queue[server_id]);
00143 if (!job) {
00144
00145 g_cond_signal(schedular->task_cond);
00146
00147
00148 job = g_async_queue_try_pop(schedular->task_queue[server_id]);
00149
00150 if (!job) {
00151 return FALSE;
00152 }
00153 }
00154
00155 *collection = job->collection;
00156 *file = job->file;
00157 *part = job->part;
00158
00159 g_slice_free(struct _part_download_done, job);
00160
00161
00162 if ((*collection)->stop_flag || (*file)->stop_flag) {
00163 update_part_download_status(*collection, *file, *part, server_id, FALSE, TRUE, FALSE, FALSE, NULL);
00164 return FALSE;
00165 }
00166
00167
00168 g_cond_signal(schedular->task_cond);
00169
00170
00171 return TRUE;
00172 }
00173
00174 static gboolean
00175 test_server_may_download_part(NNTPPart *part, int server_id)
00176 {
00177 int i;
00178 gboolean all_tried;
00179
00180 if (!schedular->is_server_enabled[server_id]) {
00181 return FALSE;
00182 }
00183
00184 if (part->servers_already_tried[server_id]) {
00185 return FALSE;
00186 }
00187
00188
00189 if (schedular->is_high_priority_server[server_id]) {
00190 return TRUE;
00191 }
00192
00193 all_tried = TRUE;
00194
00195
00196 for (i = 0; i MAX_NNTP_SERVERS; i++) {
00197 if (schedular->is_high_priority_server[i] && !part->servers_already_tried[i]) {
00198 all_tried = FALSE;
00199 }
00200 }
00201
00202 if (!all_tried) {
00203 return FALSE;
00204 }
00205
00206
00207 if (schedular->is_normal_priority_server[server_id]) {
00208 return TRUE;
00209 }
00210
00211 all_tried = TRUE;
00212 for (i = 0; i 00213 if (schedular->is_normal_priority_server[i] && !part->servers_already_tried[i]) {
00214 all_tried = FALSE;
00215 }
00216 }
00217
00218 if (!all_tried) {
00219 return FALSE;
00220 }
00221
00222
00223
00224 return TRUE;
00225 }
00226
00227 static gboolean
00228 get_next_job_to_schedule(SchedularPlugin *schedular, int server_id, NNTPCollection **collection, NNTPFile **file, NNTPPart **part, GThreadPool *poolDecoder)
00229 {
00230 GList *list;
00231
00232 g_static_mutex_lock(&schedular->mutex);
00233
00234 list = schedular->queue;
00235 while (list) {
00236 GList *list2;
00237
00238 *collection = (NNTPCollection *) list->data;
00239
00240
00241 if ((*collection)->stop_flag) {
00242 list = g_list_next(list);
00243 continue;
00244 }
00245
00246 list2 = (*collection)->files;
00247 while (list2) {
00248 GList *list3;
00249 DecoderData *decoder_data;
00250 gboolean complete_flag;
00251
00252 *file = (NNTPFile *) list2->data;
00253
00254
00255 if ((*file)->file_is_downloaded) {
00256 list2 = g_list_next(list2);
00257 continue;
00258 }
00259
00260 if ((*file)->now_decoding) {
00261 list2 = g_list_next(list2);
00262 continue;
00263 }
00264
00265
00266 if ((*file)->stop_flag) {
00267 list2 = g_list_next(list2);
00268 continue;
00269 }
00270
00271 complete_flag = TRUE;
00272 list3 = (*file)->parts;
00273 while (list3) {
00274 *part = (NNTPPart *) list3->data;
00275
00276 if ((*part)->now_downloading) {
00277 complete_flag = FALSE;
00278 }
00279
00280 if (!(*part)->downloaded) {
00281 int i;
00282 gboolean all_servers_tried;
00283
00284 if (!(*part)->now_downloading && test_server_may_download_part(*part, server_id)) {
00285 (*part)->now_downloading = TRUE;
00286 (*file)->file_is_downloading = TRUE;
00287 g_static_mutex_unlock(&schedular->mutex);
00288
00289 return TRUE;
00290 }
00291
00292
00293 all_servers_tried = TRUE;
00294
00295 for (i = 0; i servers); i++) {
00296 if ((!(*part)->servers_already_tried[i]) && schedular->is_server_enabled[i]) {
00297 all_servers_tried = FALSE;
00298 }
00299 }
00300
00301 if (!all_servers_tried) {
00302 complete_flag = FALSE;
00303 }
00304 }
00305
00306 list3 = g_list_next(list3);
00307 }
00308
00309
00310 if (complete_flag && !(*file)->now_decoding) {
00311 (*file)->now_decoding = TRUE;
00312
00313 imported_funcs_task_state_changed((*collection)->collection_name, (*file)->subject, (*file)->status, TASK_STATE_WAITING_FOR_DECODE);
00314
00315 (*file)->status = TASK_STATE_WAITING_FOR_DECODE;
00316
00317 decoder_data = g_slice_new0(DecoderData);
00318 decoder_data->collection_name = (*collection)->collection_name;
00319 decoder_data->file = (*file);
00320 decoder_data->schedular = schedular;
00321
00322 g_thread_pool_push(poolDecoder, decoder_data, NULL);
00323 }
00324
00325 list2 = g_list_next(list2);
00326 }
00327
00328 list = g_list_next(list);
00329 }
00330
00331 g_static_mutex_unlock(&schedular->mutex);
00332
00333 *collection = NULL;
00334 *file = NULL;
00335 *part = NULL;
00336
00337 return FALSE;
00338 }
00339
00340 static int
00341 get_max_threads_for_server(SchedularPlugin *schedular, int server_id)
00342 {
00343 GList *list;
00344 ConfigServer *server;
00345 int ret;
00346
00347 g_static_mutex_lock(&schedular->mutex);
00348 list = g_list_nth(schedular->servers, server_id);
00349
00350 server = list->data;
00351
00352 g_return_val_if_fail(server != NULL, -1);
00353
00354 ret = server->max_threads;
00355
00356 g_static_mutex_unlock(&schedular->mutex);
00357
00358 return ret;
00359 }
00360
00361 static void
00362 prepare_jobs(SchedularPlugin *schedular, GThreadPool *poolDecoder)
00363 {
00364 int i;
00365 struct _part_download_done *job;
00366
00367 for (i = 0; i 00368 while (schedular->task_queue[i] && g_async_queue_length(schedular->task_queue[i]) 00369
00370 NNTPCollection *collection = NULL;
00371 NNTPFile *file = NULL;
00372 NNTPPart *part = NULL;
00373
00374 if (!get_next_job_to_schedule(schedular, i, &collection, &file, &part, poolDecoder)) {
00375
00376 break;
00377 }
00378
00379 job = g_slice_new0(struct _part_download_done);
00380 job->collection = collection;
00381 job->file = file;
00382 job->part = part;
00383
00384 g_async_queue_push(schedular->task_queue[i], job);
00385 };
00386 }
00387 }
00388
00389 void
00390 update_part_download_status(NNTPCollection *collection, NNTPFile *file, NNTPPart *part, int server_id, gboolean success, gboolean reset_to_zero, gboolean queue_is_being_paused, gboolean dont_retry_anymore, gpointer nntpconnection)
00391 {
00392 GList *list;
00393 ConfigServer *server;
00394
00395 g_static_mutex_lock(&schedular->mutex);
00396
00397 part->now_downloading = FALSE;
00398 part->downloaded = success;
00399 file->file_is_downloading = FALSE;
00400
00401 if (success) {
00402 if (file->file_size_remaining size) {
00403 g_print("Invalid file size remaining!\n");
00404 #if SIZEOF_LONG == 8
00405 g_print("file->file_size_remaining = %li\n", file->file_size_remaining);
00406 #else
00407 g_print("file->file_size_remaining = %lli\n", file->file_size_remaining);
00408 #endif
00409 g_print("part->size = %i\n", part->size);
00410 file->file_size_remaining = part->size;
00411 }
00412
00413 if (collection->total_size_remaining size) {
00414 g_print("Invalid collection size remaining!\n");
00415 #if SIZEOF_LONG == 8
00416 g_print("collection->total_size_remaining = %li\n", collection->total_size_remaining);
00417 #else
00418 g_print("collection->total_size_remaining = %lli\n", collection->total_size_remaining);
00419 #endif
00420 g_print("part->size = %i\n", part->size);
00421 collection->total_size_remaining = part->size;
00422 }
00423
00424 file->file_size_remaining -= part->size;
00425 collection->total_size_remaining -= part->size;
00426 file->num_parts_downloaded++;
00427
00428 server = g_list_nth_data(schedular->servers, server_id);
00429 imported_funcs_emit_part_download_complete(server->servername, nntpconnection, collection->collection_name, file->subject, part->partnum, part->size);
00430 imported_funcs_emit_file_download_state_update(collection->collection_name, file->subject, file->numparts, file->num_parts_downloaded, file->num_parts_failed, file->file_size, file->file_size_remaining, collection->total_size, collection->total_size_remaining);
00431 } else if (!success && dont_retry_anymore) {
00432 part->servers_already_tried[server_id] = TRUE;
00433 }
00434
00435
00436 list = file->parts;
00437 while (list) {
00438 NNTPPart *part2 = list->data;
00439
00440 if (part2->now_downloading) {
00441 file->file_is_downloading = TRUE;
00442 }
00443
00444 list = g_list_next(list);
00445 }
00446
00447
00448 if (!success && dont_retry_anymore) {
00449 int i;
00450 gboolean all_servers_tried = TRUE;
00451
00452 for (i = 0; i servers); i++) {
00453 if (!part->servers_already_tried[i] && schedular->is_server_enabled[i]) {
00454 all_servers_tried = FALSE;
00455 }
00456 }
00457
00458 server = g_list_nth_data(schedular->servers, server_id);
00459
00460 if (nntpconnection) {
00461 imported_funcs_emit_part_download_failed (server->servername, nntpconnection, collection->collection_name, file->subject, part->partnum, part->size, all_servers_tried);
00462 }
00463
00464 if (all_servers_tried && !reset_to_zero) {
00465 if (file->file_size_remaining size) {
00466 g_print("Invalid file size remaining!\n");
00467 #if SIZEOF_LONG == 8
00468 g_print("file->file_size_remaining = %li\n", file->file_size_remaining);
00469 #else
00470 g_print("file->file_size_remaining = %lli\n", file->file_size_remaining);
00471 #endif
00472 g_print("part->size = %i\n", part->size);
00473 file->file_size_remaining = part->size;
00474 }
00475
00476 if (collection->total_size_remaining size) {
00477 g_print("Invalid collection size remaining!\n");
00478 #if SIZEOF_LONG == 8
00479 g_print("collection->total_size_remaining = %li\n", collection->total_size_remaining);
00480 #else
00481 g_print("collection->total_size_remaining = %lli\n", collection->total_size_remaining);
00482 #endif
00483 g_print("part->size = %i\n", part->size);
00484 collection->total_size_remaining = part->size;
00485 }
00486
00487 file->num_parts_failed++;
00488 file->file_size_remaining -= part->size;
00489 collection->total_size_remaining -= part->size;
00490
00491 imported_funcs_emit_file_download_state_update(collection->collection_name, file->subject, file->numparts, file->num_parts_downloaded, file->num_parts_failed, file->file_size, file->file_size_remaining, collection->total_size, collection->total_size_remaining);
00492 }
00493 }
00494
00495 if (reset_to_zero) {
00496 file->num_parts_downloaded = 0;
00497 file->num_parts_failed = 0;
00498
00499 imported_funcs_emit_file_download_state_update(collection->collection_name, file->subject, file->numparts, file->num_parts_downloaded, file->num_parts_failed, file->file_size, file->file_size_remaining, collection->total_size, collection->total_size_remaining);
00500 }
00501
00502 g_static_mutex_unlock(&schedular->mutex);
00503 }
00504
00505 static gpointer
00506 stop_schedular_func(gpointer data)
00507 {
00508 nntpgrab_plugin_schedular_stop((const char *) data);
00509
00510 g_free(data);
00511
00512 return NULL;
00513 }
00514
00515 void
00516 stop_schedular_from_seperate_thread(const char *reason)
00517 {
00518
00519 g_thread_create(stop_schedular_func, g_strdup(reason), FALSE, NULL);
00520 }
00521
00522 static gpointer
00523 schedular_thread_func(gpointer data)
00524 {
00525 SchedularPlugin *schedular = SCHEDULAR_PLUGIN(data);
00526 GThreadPool *pool[MAX_NNTP_SERVERS];
00527 GThreadPool *poolDecoder;
00528 GList *list;
00529 int i = 0;
00530 gboolean abort_flag;
00531 GTimeVal val;
00532
00533
00534 g_assert(g_list_length(schedular->servers) 00535 memset(pool, 0, sizeof(pool));
00536
00537 poolDecoder = g_thread_pool_new(decoder_thread_func, schedular, 1, FALSE, NULL);
00538
00539 schedular->task_cond = g_cond_new();
00540 schedular->task_mutex = g_mutex_new();
00541 memset(schedular->task_queue, 0, sizeof(schedular->task_queue));
00542
00543 list = schedular->servers;
00544 while (list) {
00545 ConfigServer *server = list->data;
00546 int j;
00547
00548 schedular->task_queue[i] = g_async_queue_new();
00549
00550
00551 if (!schedular->is_server_enabled[i]) {
00552 pool[i] = NULL;
00553 i++;
00554 list = g_list_next(list);
00555 continue;
00556 }
00557
00558 pool[i] = g_thread_pool_new(download_thread_func, schedular, server->max_threads, FALSE, NULL);
00559
00560 g_assert(pool[i]);
00561
00562
00563 for (j = 0; j max_threads; j++) {
00564 DownloadData *download_data = g_slice_new(DownloadData);
00565 download_data->server = server;
00566 download_data->server_id = i;
00567 download_data->poolDecoder = poolDecoder;
00568 g_thread_pool_push(pool[i], download_data, NULL);
00569 }
00570
00571 i++;
00572
00573 list = g_list_next(list);
00574 }
00575
00576
00577 prepare_jobs(schedular, poolDecoder);
00578
00579 abort_flag = FALSE;
00580 while (!abort_flag) {
00581 g_static_mutex_lock(&schedular->mutex);
00582 abort_flag = schedular->abort_flag;
00583 g_static_mutex_unlock(&schedular->mutex);
00584
00585 g_get_current_time(&val);
00586 g_time_val_add(&val, G_USEC_PER_SEC);
00587
00588 g_mutex_lock(schedular->task_mutex);
00589
00590 if (g_cond_timed_wait(schedular->task_cond, schedular->task_mutex, &val)) {
00591
00592 prepare_jobs(schedular, poolDecoder);
00593 }
00594
00595 g_mutex_unlock(schedular->task_mutex);
00596 }
00597
00598 for (i = 0; i 00599 if (pool[i]) {
00600 g_thread_pool_free(pool[i], TRUE, TRUE);
00601 }
00602
00603 if (schedular->task_queue[i]) {
00604
00605 struct _part_download_done *job;
00606 while ((job = g_async_queue_try_pop(schedular->task_queue[i]))) {
00607
00608 update_part_download_status(job->collection, job->file, job->part, i, FALSE, FALSE, TRUE, FALSE, NULL);
00609
00610 g_slice_free(struct _part_download_done, job);
00611 }
00612
00613 g_async_queue_unref(schedular->task_queue[i]);
00614 schedular->task_queue[i] = NULL;
00615 }
00616 }
00617
00618 g_thread_pool_free(poolDecoder, FALSE, TRUE);
00619 g_cond_free(schedular->task_cond);
00620 g_mutex_free(schedular->task_mutex);
00621
00622 return NULL;
00623 }
00624
00625 gboolean
00626 nntpgrab_plugin_schedular_start(void)
00627 {
00628 GError *error = NULL;
00629 GList *list;
00630 GList *servers;
00631 int i;
00632
00633 g_assert(schedular != NULL);
00634
00635 g_static_mutex_lock(&schedular->mutex);
00636
00637 if (schedular->thread != NULL) {
00638 g_static_mutex_unlock(&schedular->mutex);
00639 return FALSE;
00640 }
00641
00642
00643 g_assert(schedular->servers == NULL);
00644
00645 memset(&schedular->is_high_priority_server, 0, sizeof(schedular->is_high_priority_server));
00646 memset(&schedular->is_normal_priority_server, 0, sizeof(schedular->is_normal_priority_server));
00647 memset(&schedular->is_low_priority_server, 0, sizeof(schedular->is_low_priority_server));
00648 memset(&schedular->is_server_enabled, 0, sizeof(schedular->is_server_enabled));
00649
00650 servers = imported_funcs_config_get_avail_servers();
00651 list = servers;
00652 i = 0;
00653
00654 while (list) {
00655 char *servername = list->data;
00656 ConfigServer *server = imported_funcs_config_get_server_info(servername);
00657
00658 g_assert(server);
00659
00660 schedular->servers = g_list_append(schedular->servers, server);
00661 if (server->enabled) {
00662 if (server->priority == SERVER_PRIORITY_HIGH) {
00663 schedular->is_high_priority_server[i] = TRUE;
00664 schedular->is_normal_priority_server[i] = FALSE;
00665 schedular->is_low_priority_server[i] = FALSE;
00666 } else if (server->priority == SERVER_PRIORITY_LOW) {
00667 schedular->is_high_priority_server[i] = FALSE;
00668 schedular->is_normal_priority_server[i] = FALSE;
00669 schedular->is_low_priority_server[i] = TRUE;
00670 } else {
00671 schedular->is_high_priority_server[i] = FALSE;
00672 schedular->is_normal_priority_server[i] = TRUE;
00673 schedular->is_low_priority_server[i] = FALSE;
00674 }
00675 }
00676
00677 schedular->is_server_enabled[i] = server->enabled;
00678
00679 i++;
00680
00681 g_free(servername);
00682
00683 list = g_list_next(list);
00684 }
00685
00686 g_list_free(servers);
00687
00688
00689 schedular->abort_flag = FALSE;
00690
00691
00692 if (!(schedular->thread = g_thread_create(schedular_thread_func, schedular, TRUE, &error))) {
00693 imported_funcs_emit_fatal_error(error->message);
00694 g_error_free(error);
00695 }
00696
00697 g_static_mutex_unlock(&schedular->mutex);
00698
00699 imported_funcs_notify_schedular_state_changed(SCHEDULAR_STATE_RUNNING, NULL);
00700
00701 return TRUE;
00702 }
00703
00704 gboolean
00705 nntpgrab_plugin_schedular_stop(const char *reason)
00706 {
00707 GList *list;
00708
00709 g_assert(schedular != NULL);
00710
00711 g_static_mutex_lock(&schedular->runtime_mutex);
00712
00713 if (schedular->thread == NULL) {
00714 g_static_mutex_unlock(&schedular->runtime_mutex);
00715 return FALSE;
00716 }
00717
00718 imported_funcs_notify_schedular_state_changed(SCHEDULAR_STATE_STOPPING, reason);
00719
00720
00721 g_static_mutex_lock(&schedular->mutex);
00722 schedular->abort_flag = TRUE;
00723 g_static_mutex_unlock(&schedular->mutex);
00724
00725 g_thread_join(schedular->thread);
00726 schedular->thread = NULL;
00727
00728
00729 g_static_mutex_lock(&schedular->mutex);
00730 list = schedular->servers;
00731 while (list) {
00732 g_slice_free(ConfigServer, list->data);
00733 list = g_list_next(list);
00734 }
00735 g_list_free(schedular->servers);
00736 schedular->servers = NULL;
00737 g_static_mutex_unlock(&schedular->mutex);
00738
00739 g_static_mutex_unlock(&schedular->runtime_mutex);
00740
00741 imported_funcs_notify_schedular_state_changed(SCHEDULAR_STATE_STOPPED, NULL);
00742
00743 return TRUE;
00744 }
00745
00746 SchedularState
00747 nntpgrab_plugin_schedular_get_state(void)
00748 {
00749 SchedularState state;
00750
00751 g_static_mutex_lock(&schedular->mutex);
00752 if (schedular->thread && schedular->abort_flag) {
00753 state = SCHEDULAR_STATE_STOPPING;
00754 } else if (schedular->thread && !schedular->abort_flag) {
00755 state = SCHEDULAR_STATE_RUNNING;
00756 } else {
00757 state = SCHEDULAR_STATE_STOPPED;
00758 }
00759 g_static_mutex_unlock(&schedular->mutex);
00760
00761 return state;
00762 }
00763
00764 static gboolean
00765 test_is_subject_already_in_collection(NNTPCollection *collection, const char *subject)
00766 {
00767 GList *list;
00768
00769 list = collection->files;
00770 while (list) {
00771 NNTPFile *file = (NNTPFile *) list->data;
00772
00773 if (!strcmp(file->subject, subject)) {
00774 return TRUE;
00775 }
00776
00777 list = g_list_next(list);
00778 }
00779
00780 return FALSE;
00781 }
00782
00783 gboolean
00784 nntpgrab_plugin_schedular_add_task_to_queue(const char *collection_name, NNTPFile *file, char **errmsg)
00785 {
00786 GList *list;
00787 NNTPCollection *collection;
00788
00789 g_static_mutex_lock(&schedular->mutex);
00790
00791
00792 list = schedular->queue;
00793 while (list) {
00794 collection = (NNTPCollection *) list->data;
00795
00796 if (!strcmp(collection->collection_name, collection_name)) {
00797
00798
00799
00800 if (test_is_subject_already_in_collection(collection, file->subject)) {
00801 if (errmsg) {
00802 *errmsg = g_strdup_printf(_("There already is a file with subject '%s' part of collection '%s'"), file->subject, collection_name);
00803 }
00804
00805 g_static_mutex_unlock(&schedular->mutex);
00806
00807 return FALSE;
00808 }
00809
00810
00811 collection->files = g_list_append(collection->files, file);
00812 collection->total_size += file->file_size;
00813 collection->total_size_remaining += file->file_size;
00814 file->file_size_remaining = file->file_size;
00815
00816 file->position = g_list_index(collection->files, file) + 1;
00817
00818 imported_funcs_task_added(collection, file);
00819
00820 g_static_mutex_unlock(&schedular->mutex);
00821
00822 return TRUE;
00823 }
00824
00825 list = g_list_next(list);
00826 }
00827
00828
00829 collection = g_slice_new0(NNTPCollection);
00830 strncpy((char *) collection->collection_name, collection_name, sizeof(collection->collection_name));
00831 schedular->queue = g_list_append(schedular->queue, collection);
00832
00833 imported_funcs_collection_added(collection_name);
00834
00835
00836 collection->files = g_list_append(collection->files, file);
00837 collection->total_size = file->file_size;
00838 collection->total_size_remaining = file->file_size;
00839 file->position = 1;
00840
00841 imported_funcs_task_added(collection, file);
00842
00843 g_static_mutex_unlock(&schedular->mutex);
00844
00845 return TRUE;
00846 }
00847
00848 static void
00849 free_file(NNTPFile *file)
00850 {
00851 GList *list;
00852
00853 g_assert(file);
00854
00855 list = file->parts;
00856 while (list) {
00857 g_slice_free(NNTPPart, list->data);
00858 list = g_list_next(list);
00859 }
00860 g_list_free(file->parts);
00861 file->parts = NULL;
00862
00863 list = file->groups;
00864 while (list) {
00865 g_free(list->data);
00866 list = g_list_next(list);
00867 }
00868 g_list_free(file->groups);
00869 file->groups = NULL;
00870
00871 g_free(file);
00872 }
00873
00874 gboolean
00875 nntpgrab_plugin_schedular_del_task_from_queue(const char *collection_name, const char *subject, char **errmsg)
00876 {
00877 GList *list;
00878 NNTPCollection *collection;
00879 int i;
00880 ConfigOpts opts;
00881
00882 g_static_mutex_lock(&schedular->mutex);
00883
00884 opts = imported_funcs_config_get_opts();
00885
00886
00887 list = schedular->queue;
00888 while (list) {
00889 collection = (NNTPCollection *) list->data;
00890
00891 if (!strcmp(collection->collection_name, collection_name)) {
00892
00893 GList *list2;
00894 gboolean delete_flag = FALSE;
00895
00896
00897 if (!subject) {
00898
00899 delete_flag = TRUE;
00900
00901
00902 collection->stop_flag = TRUE;
00903 }
00904
00905
00906 list2 = collection->files;
00907 while (list2) {
00908 NNTPFile *file = (NNTPFile *) list2->data;
00909
00910 list2 = g_list_next(list2);
00911
00912 if (delete_flag || !strcmp(file->subject, subject)) {
00913
00914
00915 while (file->file_is_downloading || file->now_decoding) {
00916
00917 file->stop_flag = TRUE;
00918
00919
00920 g_static_mutex_unlock(&schedular->mutex);
00921
00922 g_usleep(G_USEC_PER_SEC / 10);
00923
00924 g_static_mutex_lock(&schedular->mutex);
00925 }
00926
00927
00928 for (i = 0; i numparts; i++) {
00929 char *filename = g_strdup_printf("%s%s%s.%i", opts.temp_directory, G_DIR_SEPARATOR_S, file->tmp_filename, i + 1);
00930 g_unlink(filename);
00931 g_free(filename);
00932 }
00933
00934
00935 collection->total_size -= file->file_size;
00936 collection->total_size_remaining -= file->file_size_remaining;
00937
00938
00939 collection->files = g_list_remove(collection->files, file);
00940
00941
00942 list2 = collection->files;
00943
00944 imported_funcs_task_removed(collection_name, file->subject, collection->total_size, collection->total_size_remaining);
00945
00946 free_file(file);
00947
00948
00949 if (g_list_length(collection->files) == 0) {
00950
00951 schedular->queue = g_list_remove(schedular->queue, collection);
00952
00953 imported_funcs_collection_removed(collection_name);
00954
00955 g_slice_free(NNTPCollection, collection);
00956 }
00957
00958 if (!delete_flag) {
00959 g_static_mutex_unlock(&schedular->mutex);
00960 return TRUE;
00961 }
00962 }
00963 }
00964
00965 if (delete_flag) {
00966 g_static_mutex_unlock(&schedular->mutex);
00967 return TRUE;
00968 }
00969 }
00970
00971 list = g_list_next(list);
00972 }
00973
00974 if (errmsg) {
00975 *errmsg = g_strdup_printf(_("Subject '%s' was not found in collection '%s'"), subject, collection_name);
00976 }
00977
00978 g_static_mutex_unlock(&schedular->mutex);
00979
00980 return FALSE;
00981 }
00982
00983 gboolean
00984 nntpgrab_plugin_schedular_restart_task(const char *collection_name, const char *subject, char **errmsg)
00985 {
00986 GList *list;
00987 NNTPCollection *collection;
00988
00989 g_static_mutex_lock(&schedular->mutex);
00990
00991
00992 list = schedular->queue;
00993 while (list) {
00994 collection = (NNTPCollection *) list->data;
00995
00996 if (!strcmp(collection->collection_name, collection_name)) {
00997
00998 GList *list2;
00999 gboolean restart_flag = FALSE;
01000
01001
01002 if (!subject) {
01003
01004 restart_flag = TRUE;
01005 }
01006
01007 list2 = collection->files;
01008 while (list2) {
01009 NNTPFile *file = list2->data;
01010
01011 if (restart_flag || !strcmp(file->subject, subject)) {
01012
01013 GList *list3;
01014 gboolean file_is_downloading = FALSE;
01015 int filesize_remaining_active = 0;
01016
01017 list3 = file->parts;
01018 while (list3) {
01019 NNTPPart *part = list3->data;
01020
01021 if (part->now_downloading) {
01022 list3 = g_list_next(list3);
01023 file_is_downloading = TRUE;
01024 filesize_remaining_active += part->size;
01025 continue;
01026 }
01027
01028 part->downloaded = FALSE;
01029 part->now_downloading = FALSE;
01030
01031 memset(part->servers_already_tried, 0, sizeof(part->servers_already_tried));
01032
01033 list3 = g_list_next(list3);
01034 }
01035
01036 file->file_is_downloading = file_is_downloading;
01037 file->file_is_downloaded = FALSE;
01038 file->now_decoding = FALSE;
01039 file->num_parts_downloaded = 0;
01040 file->num_parts_failed = 0;
01041
01042 collection->total_size_remaining += file->file_size - file->file_size_remaining - filesize_remaining_active;
01043 file->file_size_remaining = file->file_size - filesize_remaining_active;
01044
01045 imported_funcs_task_state_changed(collection_name, file->subject, file->status, TASK_STATE_WAITING_FOR_DOWNLOAD);
01046 file->status = TASK_STATE_WAITING_FOR_DOWNLOAD;
01047
01048 if (!restart_flag) {
01049 g_static_mutex_unlock(&schedular->mutex);
01050
01051 return TRUE;
01052 }
01053 }
01054
01055 list2 = g_list_next(list2);
01056 }
01057
01058 if (restart_flag) {
01059 g_static_mutex_unlock(&schedular->mutex);
01060 return TRUE;
01061 }
01062 }
01063
01064 list = g_list_next(list);
01065 }
01066
01067 g_static_mutex_unlock(&schedular->mutex);
01068
01069 if (errmsg) {
01070 *errmsg = g_strdup_printf(_("Subject '%s' was not found in collection '%s'"), subject, collection_name);
01071 }
01072
01073 return FALSE;
01074 }
01075
01076 gboolean
01077 nntpgrab_plugin_schedular_save_queue(char **errmsg)
01078 {
01079 return schedular_plugin_save_queue(errmsg);
01080 }
01081
01082
01083 void
01084 nntpgrab_plugin_schedular_foreach_task(GFunc foreach_func, gpointer data)
01085 {
01086 g_assert(schedular != NULL);
01087
01088 g_static_mutex_lock(&schedular->mutex);
01089
01090 g_list_foreach(schedular->queue, foreach_func, data);
01091
01092 g_static_mutex_unlock(&schedular->mutex);
01093 }
01094
01095 gboolean
01096 nntpgrab_plugin_schedular_move_task(const char *collection_name_src, const char *subject_src, const char *collection_name_dest, int position_dest)
01097 {
01098 GList *list;
01099 NNTPCollection *collection_src = NULL;
01100 NNTPCollection *collection_dest = NULL;
01101 NNTPFile *file = NULL;
01102 gboolean found = FALSE;
01103 int old_position = 0;
01104
01105 g_assert(schedular != NULL);
01106
01107 g_return_val_if_fail(collection_name_src != NULL, FALSE);
01108 g_return_val_if_fail(subject_src != NULL, FALSE);
01109 g_return_val_if_fail(collection_name_dest != NULL, FALSE);
01110 g_return_val_if_fail(position_dest >= -1, FALSE);
01111
01112
01113 g_return_val_if_fail(!strcmp(collection_name_src, collection_name_dest), FALSE);
01114
01115 g_static_mutex_lock(&schedular->mutex);
01116
01117
01118 list = schedular->queue;
01119 while (list && !found) {
01120 collection_src = (NNTPCollection *) list->data;
01121
01122 if (!strcmp(collection_src->collection_name, collection_name_src)) {
01123 GList *list2;
01124
01125 list2 = collection_src->files;
01126 while (list2) {
01127 file = list2->data;
01128
01129 if (!strcmp(file->subject, subject_src)) {
01130 found = TRUE;
01131 break;
01132 }
01133
01134 file = NULL;
01135 list2 = g_list_next(list2);
01136 old_position++;
01137 }
01138 }
01139
01140 list = g_list_next(list);
01141 }
01142
01143 if (!found) {
01144 g_static_mutex_unlock(&schedular->mutex);
01145 return FALSE;
01146 }
01147
01148
01149 if (!strcmp(collection_name_src, collection_name_dest)) {
01150 collection_dest = collection_src;
01151 } else {
01152
01153 list = schedular->queue;
01154 found = FALSE;
01155 while (list && !found) {
01156 collection_dest = (NNTPCollection *) list->data;
01157
01158 if (!strcmp(collection_dest->collection_name, collection_name_src)) {
01159 found = TRUE;
01160 }
01161
01162 list = g_list_next(list);
01163 }
01164
01165 if (!found) {
01166 g_static_mutex_unlock(&schedular->mutex);
01167 return FALSE;
01168 }
01169 }
01170
01171
01172 collection_src->files = g_list_remove(collection_src->files, file);
01173 collection_dest->files = g_list_insert(collection_dest->files, file, position_dest);
01174
01175 #if 0
01176 int i = 0;
01177
01178 list = collection_dest->files;
01179 while (list) {
01180 NNTPFile *file = (NNTPFile *) list->data;
01181
01182 g_print("subject %i = %s\n", i, file->subject);
01183 i++;
01184 list = g_list_next(list);
01185 }
01186 #endif
01187
01188 g_static_mutex_unlock(&schedular->mutex);
01189
01190 imported_funcs_emit_task_moved(collection_name_src, subject_src, collection_name_dest, old_position, position_dest);
01191
01192 return TRUE;
01193 }
01194
01195 gboolean
01196 nntpgrab_plugin_schedular_move_collection(const char *collection_name, int new_position)
01197 {
01198 GList *list;
01199 NNTPCollection *collection_src = NULL;
01200 gboolean found = FALSE;
01201 int orig_position = 0;
01202
01203 g_assert(schedular != NULL);
01204
01205 g_static_mutex_lock(&schedular->mutex);
01206
01207
01208 list = schedular->queue;
01209 while (list) {
01210 collection_src = (NNTPCollection *) list->data;
01211
01212 if (!strcmp(collection_src->collection_name, collection_name)) {
01213 found = TRUE;
01214 schedular->queue = g_list_remove(schedular->queue, collection_src);
01215 schedular->queue = g_list_insert(schedular->queue, collection_src, new_position);
01216 break;
01217 }
01218
01219 orig_position++;
01220 list = g_list_next(list);
01221 }
01222
01223 #if 0
01224 int i = 0;
01225
01226 list = schedular->queue;
01227 while (list) {
01228 collection_src = (NNTPCollection *) list->data;
01229
01230 g_print("collection %i = %s\n", i, collection_src->collection_name);
01231 i++;
01232 list = g_list_next(list);
01233 }
01234 #endif
01235
01236 g_static_mutex_unlock(&schedular->mutex);
01237
01238 if (found) {
01239 imported_funcs_emit_collection_moved(collection_name, orig_position, new_position);
01240 }
01241
01242 return found;
01243 }
01244
01245 gboolean
01246 schedular_plugin_save_queue(char **errmsg)
01247 {
01248 g_assert(schedular != NULL);
01249
01250 g_static_mutex_lock(&schedular->mutex);
01251
01252 if (!save_download_queue(schedular->queue, errmsg)) {
01253 g_static_mutex_unlock(&schedular->mutex);
01254 return FALSE;
01255 }
01256
01257 g_static_mutex_unlock(&schedular->mutex);
01258
01259 return TRUE;
01260 }
01261