00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 #include 
00020 #include 
00021 #include 
00022 #include 
00023 #include 
00024 #include 
00025 #include 
00026 #include 
00027 #include 
00028 #ifdef WIN32
00029 #include 
00030 #endif
00031 #include "nntpgrab_plugin_schedular.h"
00032 #include "schedular_plugin.h"
00033 
00034 typedef struct SchedularPluginClass SchedularPluginClass;
00035 
00036 static SchedularPlugin *schedular;
00037 
00038 struct SchedularPluginClass
00039 {
00040     GObjectClass parent;
00041 };
00042 
00043 G_DEFINE_TYPE(SchedularPlugin, schedular_plugin, G_TYPE_OBJECT);
00044 
00045 gboolean
00046 nntpgrab_plugin_schedular_initialize(PluginSchedularImportedFuncs funcs, char **errmsg)
00047 {
00048     schedular_plugin_set_imported_funcs(funcs);
00049 
00050     schedular = g_object_new(SCHEDULAR_PLUGIN_TYPE_OBJECT, NULL);
00051 
00052     if (!load_download_queue(&schedular->queue, errmsg)) {
00053         g_object_unref(schedular);
00054 
00055         return FALSE;
00056     }
00057 
00058     return TRUE;
00059 }
00060 
00061 void
00062 nntpgrab_plugin_schedular_destroy(void)
00063 {
00064     g_object_unref(schedular);
00065 }
00066 
00067 int
00068 nntpgrab_plugin_schedular_get_version(void)
00069 {
00070     return NNTPGRAB_PLUGIN_API_VERSION;
00071 }
00072 
00073 static void
00074 schedular_plugin_init (SchedularPlugin *obj)
00075 {
00076     g_static_mutex_init(&obj->mutex);
00077     g_static_mutex_init(&obj->runtime_mutex);
00078 
00079     obj->thread = NULL;
00080     obj->abort_flag = FALSE;
00081     obj->queue = NULL;
00082 }
00083 
00084 static void
00085 schedular_plugin_finalize (GObject *obj)
00086 {
00087     SchedularPlugin *schedular = SCHEDULAR_PLUGIN(obj);
00088     GList *list;
00089 
00090     g_static_mutex_lock(&schedular->runtime_mutex);
00091 
00092     if (schedular->thread) {
00093         g_static_mutex_lock(&schedular->mutex);
00094         schedular->abort_flag = TRUE;
00095         g_static_mutex_unlock(&schedular->mutex);
00096 
00097         g_thread_join(schedular->thread);
00098     }
00099 
00100     g_static_mutex_unlock(&schedular->runtime_mutex);
00101 
00102     g_static_mutex_lock(&schedular->mutex);
00103 
00104     
00105     list = schedular->servers;
00106     while (list) {
00107         g_slice_free(ConfigServer, list->data);
00108         list = g_list_next(list);
00109     }
00110     g_list_free(schedular->servers);
00111     schedular->servers = NULL;
00112 
00113     save_download_queue(schedular->queue, NULL);        
00114 
00115     free_download_queue(schedular->queue);
00116 
00117     g_static_mutex_unlock(&schedular->mutex);
00118 
00119     g_static_mutex_free(&schedular->mutex);
00120     g_static_mutex_free(&schedular->runtime_mutex);
00121 }
00122 
00123 static void
00124 schedular_plugin_class_init (SchedularPluginClass *klass)
00125 {
00126     GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
00127 
00128     gobject_class->finalize = schedular_plugin_finalize;
00129 }
00130 
00131 struct _part_download_done {
00132     NNTPCollection *collection;
00133     NNTPFile *file;
00134     NNTPPart *part;
00135 };
00136 
00137 gboolean
00138 get_next_part_to_download(SchedularPlugin *schedular, int server_id, NNTPCollection **collection, NNTPFile **file, NNTPPart **part, GThreadPool *poolDecoder)
00139 {
00140     struct _part_download_done *job;
00141 
00142     job = g_async_queue_try_pop(schedular->task_queue[server_id]);
00143     if (!job) {
00144         
00145         g_cond_signal(schedular->task_cond);
00146         
00147 
00148         job = g_async_queue_try_pop(schedular->task_queue[server_id]);
00149 
00150         if (!job) {
00151             return FALSE;
00152         }
00153     }
00154 
00155     *collection = job->collection;
00156     *file = job->file;
00157     *part = job->part;
00158 
00159     g_slice_free(struct _part_download_done, job);
00160 
00161     
00162     if ((*collection)->stop_flag || (*file)->stop_flag) {
00163         update_part_download_status(*collection, *file, *part, server_id, FALSE, TRUE, FALSE, FALSE, NULL);
00164         return FALSE;
00165     }
00166 
00167     
00168     g_cond_signal(schedular->task_cond);
00169     
00170 
00171     return TRUE;
00172 }
00173 
00174 static gboolean
00175 test_server_may_download_part(NNTPPart *part, int server_id)
00176 {
00177     int i;
00178     gboolean all_tried;
00179 
00180     if (!schedular->is_server_enabled[server_id]) {
00181         return FALSE;
00182     }
00183 
00184     if (part->servers_already_tried[server_id]) {
00185         return FALSE;
00186     }
00187 
00188     
00189     if (schedular->is_high_priority_server[server_id]) {
00190         return TRUE;
00191     }
00192 
00193     all_tried = TRUE;
00194 
00195     
00196     for (i = 0; i MAX_NNTP_SERVERS; i++) {
00197         if (schedular->is_high_priority_server[i] && !part->servers_already_tried[i]) {
00198             all_tried = FALSE;
00199         }
00200     }
00201 
00202     if (!all_tried) {
00203         return FALSE;
00204     }
00205 
00206     
00207     if (schedular->is_normal_priority_server[server_id]) {
00208         return TRUE;
00209     }
00210 
00211     all_tried = TRUE;
00212     for (i = 0; i 00213         if (schedular->is_normal_priority_server[i] && !part->servers_already_tried[i]) {
00214             all_tried = FALSE;
00215         }
00216     }
00217 
00218     if (!all_tried) {
00219         return FALSE;
00220     }
00221 
00222     
00223 
00224     return TRUE;
00225 }
00226 
00227 static gboolean
00228 get_next_job_to_schedule(SchedularPlugin *schedular, int server_id, NNTPCollection **collection, NNTPFile **file, NNTPPart **part, GThreadPool *poolDecoder)
00229 {
00230     GList *list;
00231 
00232     g_static_mutex_lock(&schedular->mutex);
00233 
00234     list = schedular->queue;
00235     while (list) {
00236         GList *list2;
00237 
00238         *collection = (NNTPCollection *) list->data;
00239 
00240         
00241         if ((*collection)->stop_flag) {
00242             list = g_list_next(list);
00243             continue;
00244         }
00245 
00246         list2 = (*collection)->files;
00247         while (list2) {
00248             GList *list3;
00249             DecoderData *decoder_data;
00250             gboolean complete_flag;
00251 
00252             *file = (NNTPFile *) list2->data;
00253 
00254             
00255             if ((*file)->file_is_downloaded) {
00256                 list2 = g_list_next(list2);
00257                 continue;
00258             }
00259 
00260             if ((*file)->now_decoding) {
00261                 list2 = g_list_next(list2);
00262                 continue;
00263             }
00264 
00265             
00266             if ((*file)->stop_flag) {
00267                 list2 = g_list_next(list2);
00268                 continue;
00269             }
00270 
00271             complete_flag = TRUE;
00272             list3 = (*file)->parts;
00273             while (list3) {
00274                 *part = (NNTPPart *) list3->data;
00275 
00276                 if ((*part)->now_downloading) {
00277                     complete_flag = FALSE;
00278                 }
00279 
00280                 if (!(*part)->downloaded) {
00281                     int i;
00282                     gboolean all_servers_tried;
00283 
00284                                         if (!(*part)->now_downloading && test_server_may_download_part(*part, server_id)) {
00285                         (*part)->now_downloading = TRUE;
00286                         (*file)->file_is_downloading = TRUE;
00287                         g_static_mutex_unlock(&schedular->mutex);
00288 
00289                         return TRUE;
00290                     }
00291 
00292                     
00293                     all_servers_tried = TRUE;
00294 
00295                     for (i = 0; i servers); i++) {
00296                         if ((!(*part)->servers_already_tried[i]) && schedular->is_server_enabled[i]) {
00297                             all_servers_tried = FALSE;
00298                         }
00299                     }
00300 
00301                     if (!all_servers_tried) {
00302                         complete_flag = FALSE;
00303                     }
00304                 }
00305 
00306                 list3 = g_list_next(list3);
00307             }
00308 
00309             
00310             if (complete_flag && !(*file)->now_decoding) {
00311                 (*file)->now_decoding = TRUE;
00312 
00313                 imported_funcs_task_state_changed((*collection)->collection_name, (*file)->subject, (*file)->status, TASK_STATE_WAITING_FOR_DECODE);
00314 
00315                 (*file)->status = TASK_STATE_WAITING_FOR_DECODE;
00316 
00317                 decoder_data = g_slice_new0(DecoderData);
00318                 decoder_data->collection_name = (*collection)->collection_name;
00319                 decoder_data->file = (*file);
00320                 decoder_data->schedular = schedular;
00321 
00322                 g_thread_pool_push(poolDecoder, decoder_data, NULL);
00323             }
00324 
00325             list2 = g_list_next(list2);
00326         }
00327 
00328         list = g_list_next(list);
00329     }
00330 
00331     g_static_mutex_unlock(&schedular->mutex);
00332 
00333     *collection = NULL;
00334     *file = NULL;
00335     *part = NULL;
00336 
00337     return FALSE;
00338 }
00339 
00340 static int
00341 get_max_threads_for_server(SchedularPlugin *schedular, int server_id)
00342 {
00343     GList *list;
00344     ConfigServer *server;
00345     int ret;
00346 
00347     g_static_mutex_lock(&schedular->mutex);
00348     list = g_list_nth(schedular->servers, server_id);
00349 
00350     server = list->data;
00351 
00352     g_return_val_if_fail(server != NULL, -1);
00353 
00354     ret = server->max_threads;
00355 
00356     g_static_mutex_unlock(&schedular->mutex);
00357 
00358     return ret;
00359 }
00360 
00361 static void
00362 prepare_jobs(SchedularPlugin *schedular, GThreadPool *poolDecoder)
00363 {
00364     int i;
00365     struct _part_download_done *job;
00366 
00367     for (i = 0; i 00368         while (schedular->task_queue[i] && g_async_queue_length(schedular->task_queue[i]) 00369             
00370             NNTPCollection *collection = NULL;
00371             NNTPFile *file = NULL;
00372             NNTPPart *part = NULL;
00373 
00374             if (!get_next_job_to_schedule(schedular, i, &collection, &file, &part, poolDecoder)) {
00375                 
00376                 break;
00377             }
00378 
00379             job = g_slice_new0(struct _part_download_done);
00380             job->collection = collection;
00381             job->file = file;
00382             job->part = part;
00383 
00384             g_async_queue_push(schedular->task_queue[i], job);
00385         };
00386     }
00387 }
00388 
00389 void
00390 update_part_download_status(NNTPCollection *collection, NNTPFile *file, NNTPPart *part, int server_id, gboolean success, gboolean reset_to_zero, gboolean queue_is_being_paused, gboolean dont_retry_anymore, gpointer nntpconnection)
00391 {
00392     GList *list;
00393     ConfigServer *server;
00394 
00395     g_static_mutex_lock(&schedular->mutex);
00396 
00397     part->now_downloading = FALSE;
00398     part->downloaded = success;
00399     file->file_is_downloading = FALSE;
00400 
00401     if (success) {
00402         if (file->file_size_remaining size) {
00403             g_print("Invalid file size remaining!\n");
00404 #if SIZEOF_LONG == 8
00405             g_print("file->file_size_remaining = %li\n", file->file_size_remaining);
00406 #else
00407             g_print("file->file_size_remaining = %lli\n", file->file_size_remaining);
00408 #endif
00409             g_print("part->size = %i\n", part->size);
00410             file->file_size_remaining = part->size;
00411         }
00412 
00413         if (collection->total_size_remaining size) {
00414             g_print("Invalid collection size remaining!\n");
00415 #if SIZEOF_LONG == 8
00416             g_print("collection->total_size_remaining = %li\n", collection->total_size_remaining);
00417 #else
00418             g_print("collection->total_size_remaining = %lli\n", collection->total_size_remaining);
00419 #endif
00420             g_print("part->size = %i\n", part->size);
00421             collection->total_size_remaining = part->size;
00422         }
00423 
00424         file->file_size_remaining -= part->size;
00425         collection->total_size_remaining -= part->size;
00426         file->num_parts_downloaded++;
00427 
00428         server = g_list_nth_data(schedular->servers, server_id);
00429         imported_funcs_emit_part_download_complete(server->servername, nntpconnection, collection->collection_name, file->subject, part->partnum, part->size);
00430         imported_funcs_emit_file_download_state_update(collection->collection_name, file->subject, file->numparts, file->num_parts_downloaded, file->num_parts_failed, file->file_size, file->file_size_remaining, collection->total_size, collection->total_size_remaining);
00431     } else if (!success && dont_retry_anymore) {
00432         part->servers_already_tried[server_id] = TRUE;
00433     }
00434 
00435     
00436     list = file->parts;
00437     while (list) {
00438         NNTPPart *part2 = list->data;
00439 
00440         if (part2->now_downloading) {
00441             file->file_is_downloading = TRUE;
00442         }
00443 
00444         list = g_list_next(list);
00445     }
00446 
00447     
00448     if (!success && dont_retry_anymore) {
00449         int i;
00450         gboolean all_servers_tried = TRUE;
00451 
00452         for (i = 0; i servers); i++) {
00453             if (!part->servers_already_tried[i] && schedular->is_server_enabled[i]) {
00454                 all_servers_tried = FALSE;
00455             }
00456         }
00457 
00458         server = g_list_nth_data(schedular->servers, server_id);
00459 
00460         if (nntpconnection) {
00461             imported_funcs_emit_part_download_failed (server->servername, nntpconnection, collection->collection_name, file->subject, part->partnum, part->size, all_servers_tried);
00462         }
00463 
00464         if (all_servers_tried && !reset_to_zero) {
00465             if (file->file_size_remaining size) {
00466                 g_print("Invalid file size remaining!\n");
00467 #if SIZEOF_LONG == 8
00468                 g_print("file->file_size_remaining = %li\n", file->file_size_remaining);
00469 #else
00470                 g_print("file->file_size_remaining = %lli\n", file->file_size_remaining);
00471 #endif
00472                 g_print("part->size = %i\n", part->size);
00473                 file->file_size_remaining = part->size;
00474             }
00475 
00476             if (collection->total_size_remaining size) {
00477                 g_print("Invalid collection size remaining!\n");
00478 #if SIZEOF_LONG == 8
00479                 g_print("collection->total_size_remaining = %li\n", collection->total_size_remaining);
00480 #else
00481                 g_print("collection->total_size_remaining = %lli\n", collection->total_size_remaining);
00482 #endif
00483                 g_print("part->size = %i\n", part->size);
00484                 collection->total_size_remaining = part->size;
00485             }
00486 
00487             file->num_parts_failed++;
00488             file->file_size_remaining -= part->size;
00489             collection->total_size_remaining -= part->size;
00490 
00491             imported_funcs_emit_file_download_state_update(collection->collection_name, file->subject, file->numparts, file->num_parts_downloaded, file->num_parts_failed, file->file_size, file->file_size_remaining, collection->total_size, collection->total_size_remaining);
00492         }
00493     }
00494 
00495     if (reset_to_zero) {
00496         file->num_parts_downloaded = 0;
00497         file->num_parts_failed = 0;
00498 
00499         imported_funcs_emit_file_download_state_update(collection->collection_name, file->subject, file->numparts, file->num_parts_downloaded, file->num_parts_failed, file->file_size, file->file_size_remaining, collection->total_size, collection->total_size_remaining);
00500     }
00501 
00502     g_static_mutex_unlock(&schedular->mutex);
00503 }
00504 
00505 static gpointer
00506 stop_schedular_func(gpointer data)
00507 {
00508     nntpgrab_plugin_schedular_stop((const char *) data);
00509 
00510     g_free(data);
00511 
00512     return NULL;
00513 }
00514 
00515 void
00516 stop_schedular_from_seperate_thread(const char *reason)
00517 {
00518     
00519     g_thread_create(stop_schedular_func, g_strdup(reason), FALSE, NULL);
00520 }
00521 
00522 static gpointer
00523 schedular_thread_func(gpointer data)
00524 {
00525     SchedularPlugin *schedular = SCHEDULAR_PLUGIN(data);
00526     GThreadPool *pool[MAX_NNTP_SERVERS];
00527     GThreadPool *poolDecoder;
00528     GList *list;
00529     int i = 0;
00530     gboolean abort_flag;
00531     GTimeVal val;
00532 
00533     
00534     g_assert(g_list_length(schedular->servers) 00535     memset(pool, 0, sizeof(pool));
00536 
00537     poolDecoder = g_thread_pool_new(decoder_thread_func, schedular, 1, FALSE, NULL);
00538 
00539     schedular->task_cond = g_cond_new();
00540     schedular->task_mutex = g_mutex_new();
00541     memset(schedular->task_queue, 0, sizeof(schedular->task_queue));
00542 
00543     list = schedular->servers;
00544     while (list) {
00545         ConfigServer *server = list->data;
00546         int j;
00547 
00548         schedular->task_queue[i] = g_async_queue_new();
00549 
00550         
00551         if (!schedular->is_server_enabled[i]) {
00552             pool[i] = NULL;
00553             i++;
00554             list = g_list_next(list);
00555             continue;
00556         }
00557 
00558         pool[i] = g_thread_pool_new(download_thread_func, schedular, server->max_threads, FALSE, NULL);
00559 
00560         g_assert(pool[i]);
00561 
00562         
00563         for (j = 0; j max_threads; j++) {
00564             DownloadData *download_data = g_slice_new(DownloadData);
00565             download_data->server = server;
00566             download_data->server_id = i;
00567             download_data->poolDecoder = poolDecoder;
00568             g_thread_pool_push(pool[i], download_data, NULL);
00569         }
00570 
00571         i++;
00572 
00573         list = g_list_next(list);
00574     }
00575 
00576     
00577     prepare_jobs(schedular, poolDecoder);
00578 
00579     abort_flag = FALSE;
00580     while (!abort_flag) {
00581         g_static_mutex_lock(&schedular->mutex);
00582         abort_flag = schedular->abort_flag;
00583         g_static_mutex_unlock(&schedular->mutex);
00584 
00585         g_get_current_time(&val);
00586         g_time_val_add(&val, G_USEC_PER_SEC);
00587 
00588         g_mutex_lock(schedular->task_mutex);
00589 
00590         if (g_cond_timed_wait(schedular->task_cond, schedular->task_mutex, &val)) {
00591             
00592             prepare_jobs(schedular, poolDecoder);
00593         }
00594 
00595         g_mutex_unlock(schedular->task_mutex);
00596     }
00597 
00598     for (i = 0; i 00599         if (pool[i]) {
00600             g_thread_pool_free(pool[i], TRUE, TRUE);
00601         }
00602 
00603         if (schedular->task_queue[i]) {
00604             
00605             struct _part_download_done *job;
00606             while ((job = g_async_queue_try_pop(schedular->task_queue[i]))) {
00607                 
00608                 update_part_download_status(job->collection, job->file, job->part, i, FALSE, FALSE, TRUE, FALSE, NULL);
00609 
00610                 g_slice_free(struct _part_download_done, job);
00611             }
00612 
00613             g_async_queue_unref(schedular->task_queue[i]);
00614             schedular->task_queue[i] = NULL;
00615         }
00616     }
00617 
00618     g_thread_pool_free(poolDecoder, FALSE, TRUE);
00619     g_cond_free(schedular->task_cond);
00620     g_mutex_free(schedular->task_mutex);
00621 
00622     return NULL;
00623 }
00624 
00625 gboolean
00626 nntpgrab_plugin_schedular_start(void)
00627 {
00628     GError *error = NULL;
00629     GList *list;
00630     GList *servers;
00631     int i;
00632 
00633     g_assert(schedular != NULL);
00634 
00635     g_static_mutex_lock(&schedular->mutex);
00636 
00637     if (schedular->thread != NULL) {
00638         g_static_mutex_unlock(&schedular->mutex);
00639         return FALSE;
00640     }
00641 
00642     
00643     g_assert(schedular->servers == NULL);
00644 
00645     memset(&schedular->is_high_priority_server, 0, sizeof(schedular->is_high_priority_server));
00646     memset(&schedular->is_normal_priority_server, 0, sizeof(schedular->is_normal_priority_server));
00647     memset(&schedular->is_low_priority_server, 0, sizeof(schedular->is_low_priority_server));
00648     memset(&schedular->is_server_enabled, 0, sizeof(schedular->is_server_enabled));
00649 
00650     servers = imported_funcs_config_get_avail_servers();
00651     list = servers;
00652     i = 0;
00653 
00654     while (list) {
00655         char *servername = list->data;
00656         ConfigServer *server = imported_funcs_config_get_server_info(servername);
00657 
00658         g_assert(server);
00659 
00660         schedular->servers = g_list_append(schedular->servers, server);
00661         if (server->enabled) {
00662             if (server->priority == SERVER_PRIORITY_HIGH) {
00663                 schedular->is_high_priority_server[i] = TRUE;
00664                 schedular->is_normal_priority_server[i] = FALSE;
00665                 schedular->is_low_priority_server[i] = FALSE;
00666             } else if (server->priority == SERVER_PRIORITY_LOW) {
00667                 schedular->is_high_priority_server[i] = FALSE;
00668                 schedular->is_normal_priority_server[i] = FALSE;
00669                 schedular->is_low_priority_server[i] = TRUE;
00670             } else {
00671                 schedular->is_high_priority_server[i] = FALSE;
00672                 schedular->is_normal_priority_server[i] = TRUE;
00673                 schedular->is_low_priority_server[i] = FALSE;
00674             }
00675         }
00676 
00677         schedular->is_server_enabled[i] = server->enabled;
00678 
00679         i++;
00680 
00681         g_free(servername);
00682 
00683         list = g_list_next(list);
00684     }
00685 
00686     g_list_free(servers);
00687 
00688     
00689     schedular->abort_flag = FALSE;
00690 
00691     
00692     if (!(schedular->thread = g_thread_create(schedular_thread_func, schedular, TRUE, &error))) {
00693         imported_funcs_emit_fatal_error(error->message);
00694         g_error_free(error);
00695     }
00696 
00697     g_static_mutex_unlock(&schedular->mutex);
00698 
00699     imported_funcs_notify_schedular_state_changed(SCHEDULAR_STATE_RUNNING, NULL);
00700 
00701     return TRUE;
00702 }
00703 
00704 gboolean
00705 nntpgrab_plugin_schedular_stop(const char *reason)
00706 {
00707     GList *list;
00708 
00709     g_assert(schedular != NULL);
00710 
00711     g_static_mutex_lock(&schedular->runtime_mutex);
00712 
00713     if (schedular->thread == NULL) {
00714         g_static_mutex_unlock(&schedular->runtime_mutex);
00715         return FALSE;
00716     }
00717 
00718     imported_funcs_notify_schedular_state_changed(SCHEDULAR_STATE_STOPPING, reason);
00719 
00720     
00721     g_static_mutex_lock(&schedular->mutex);
00722     schedular->abort_flag = TRUE;
00723     g_static_mutex_unlock(&schedular->mutex);
00724 
00725     g_thread_join(schedular->thread);
00726     schedular->thread = NULL;
00727 
00728     
00729     g_static_mutex_lock(&schedular->mutex);
00730     list = schedular->servers;
00731     while (list) {
00732         g_slice_free(ConfigServer, list->data);
00733         list = g_list_next(list);
00734     }
00735     g_list_free(schedular->servers);
00736     schedular->servers = NULL;
00737     g_static_mutex_unlock(&schedular->mutex);
00738 
00739     g_static_mutex_unlock(&schedular->runtime_mutex);
00740 
00741     imported_funcs_notify_schedular_state_changed(SCHEDULAR_STATE_STOPPED, NULL);
00742 
00743     return TRUE;
00744 }
00745 
00746 SchedularState
00747 nntpgrab_plugin_schedular_get_state(void)
00748 {
00749     SchedularState state;
00750 
00751     g_static_mutex_lock(&schedular->mutex);
00752     if (schedular->thread && schedular->abort_flag) {
00753         state = SCHEDULAR_STATE_STOPPING;
00754     } else if (schedular->thread && !schedular->abort_flag) {
00755         state = SCHEDULAR_STATE_RUNNING;
00756     } else {
00757         state = SCHEDULAR_STATE_STOPPED;
00758     }
00759     g_static_mutex_unlock(&schedular->mutex);
00760 
00761     return state;
00762 }
00763 
00764 static gboolean
00765 test_is_subject_already_in_collection(NNTPCollection *collection, const char *subject)
00766 {
00767     GList *list;
00768 
00769     list = collection->files;
00770     while (list) {
00771         NNTPFile *file = (NNTPFile *) list->data;
00772 
00773         if (!strcmp(file->subject, subject)) {
00774             return TRUE;
00775         }
00776 
00777         list = g_list_next(list);
00778     }
00779 
00780     return FALSE;
00781 }
00782 
00783 gboolean
00784 nntpgrab_plugin_schedular_add_task_to_queue(const char *collection_name, NNTPFile *file, char **errmsg)
00785 {
00786     GList *list;
00787     NNTPCollection *collection;
00788 
00789     g_static_mutex_lock(&schedular->mutex);
00790 
00791     
00792     list = schedular->queue;
00793     while (list) {
00794         collection = (NNTPCollection *) list->data;
00795 
00796         if (!strcmp(collection->collection_name, collection_name)) {
00797             
00798 
00799             
00800             if (test_is_subject_already_in_collection(collection, file->subject)) {
00801                 if (errmsg) {
00802                     *errmsg = g_strdup_printf(_("There already is a file with subject '%s' part of collection '%s'"), file->subject, collection_name);
00803                 }
00804 
00805                 g_static_mutex_unlock(&schedular->mutex);
00806 
00807                 return FALSE;
00808             }
00809 
00810             
00811             collection->files = g_list_append(collection->files, file);
00812             collection->total_size += file->file_size;
00813             collection->total_size_remaining += file->file_size;
00814             file->file_size_remaining = file->file_size;
00815 
00816             file->position = g_list_index(collection->files, file) + 1;
00817 
00818             imported_funcs_task_added(collection, file);
00819 
00820             g_static_mutex_unlock(&schedular->mutex);
00821 
00822             return TRUE;
00823         }
00824 
00825         list = g_list_next(list);
00826     }
00827 
00828     
00829     collection = g_slice_new0(NNTPCollection);
00830     strncpy((char *) collection->collection_name, collection_name, sizeof(collection->collection_name));
00831     schedular->queue = g_list_append(schedular->queue, collection);
00832 
00833     imported_funcs_collection_added(collection_name);
00834 
00835     
00836     collection->files = g_list_append(collection->files, file);
00837     collection->total_size = file->file_size;
00838     collection->total_size_remaining = file->file_size;
00839     file->position = 1;
00840 
00841     imported_funcs_task_added(collection, file);
00842 
00843     g_static_mutex_unlock(&schedular->mutex);
00844 
00845     return TRUE;
00846 }
00847 
00848 static void
00849 free_file(NNTPFile *file)
00850 {
00851     GList *list;
00852 
00853     g_assert(file);
00854 
00855     list = file->parts;
00856     while (list) {
00857         g_slice_free(NNTPPart, list->data);
00858         list = g_list_next(list);
00859     }
00860     g_list_free(file->parts);
00861     file->parts = NULL;
00862 
00863     list = file->groups;
00864     while (list) {
00865         g_free(list->data);
00866         list = g_list_next(list);
00867     }
00868     g_list_free(file->groups);
00869     file->groups = NULL;
00870 
00871     g_free(file);
00872 }
00873 
00874 gboolean
00875 nntpgrab_plugin_schedular_del_task_from_queue(const char *collection_name, const char *subject, char **errmsg)
00876 {
00877     GList *list;
00878     NNTPCollection *collection;
00879     int i;
00880     ConfigOpts opts;
00881 
00882     g_static_mutex_lock(&schedular->mutex);
00883 
00884     opts = imported_funcs_config_get_opts();
00885 
00886     
00887     list = schedular->queue;
00888     while (list) {
00889         collection = (NNTPCollection *) list->data;
00890 
00891         if (!strcmp(collection->collection_name, collection_name)) {
00892             
00893             GList *list2;
00894             gboolean delete_flag = FALSE;
00895 
00896             
00897             if (!subject) {
00898                 
00899                 delete_flag = TRUE;
00900 
00901                 
00902                 collection->stop_flag = TRUE;
00903             }
00904 
00905             
00906             list2 = collection->files;
00907             while (list2) {
00908                 NNTPFile *file = (NNTPFile *) list2->data;
00909 
00910                 list2 = g_list_next(list2);
00911 
00912                 if (delete_flag || !strcmp(file->subject, subject)) {
00913                     
00914                     
00915                     while (file->file_is_downloading || file->now_decoding) {
00916                         
00917                         file->stop_flag = TRUE;
00918 
00919                         
00920                         g_static_mutex_unlock(&schedular->mutex);
00921 
00922                         g_usleep(G_USEC_PER_SEC / 10);
00923 
00924                         g_static_mutex_lock(&schedular->mutex);
00925                     }
00926 
00927                     
00928                     for (i = 0; i numparts; i++) {
00929                         char *filename = g_strdup_printf("%s%s%s.%i", opts.temp_directory, G_DIR_SEPARATOR_S, file->tmp_filename, i + 1);
00930                         g_unlink(filename);
00931                         g_free(filename);
00932                     }
00933 
00934                     
00935                     collection->total_size -= file->file_size;
00936                     collection->total_size_remaining -= file->file_size_remaining;
00937 
00938                     
00939                     collection->files = g_list_remove(collection->files, file);
00940 
00941                     
00942                     list2 = collection->files;
00943 
00944                     imported_funcs_task_removed(collection_name, file->subject, collection->total_size, collection->total_size_remaining);
00945 
00946                     free_file(file);
00947 
00948                     
00949                     if (g_list_length(collection->files) == 0) {
00950                         
00951                         schedular->queue = g_list_remove(schedular->queue, collection);
00952 
00953                         imported_funcs_collection_removed(collection_name);
00954 
00955                         g_slice_free(NNTPCollection, collection);
00956                     }
00957 
00958                     if (!delete_flag) {
00959                         g_static_mutex_unlock(&schedular->mutex);
00960                         return TRUE;
00961                     }
00962                 }
00963             }
00964 
00965             if (delete_flag) {
00966                 g_static_mutex_unlock(&schedular->mutex);
00967                 return TRUE;
00968             }
00969         }
00970 
00971         list = g_list_next(list);
00972     }
00973 
00974     if (errmsg) {
00975         *errmsg = g_strdup_printf(_("Subject '%s' was not found in collection '%s'"), subject, collection_name);
00976     }
00977 
00978     g_static_mutex_unlock(&schedular->mutex);
00979 
00980     return FALSE;
00981 }
00982 
00983 gboolean
00984 nntpgrab_plugin_schedular_restart_task(const char *collection_name, const char *subject, char **errmsg)
00985 {
00986     GList *list;
00987     NNTPCollection *collection;
00988 
00989     g_static_mutex_lock(&schedular->mutex);
00990 
00991     
00992     list = schedular->queue;
00993     while (list) {
00994         collection = (NNTPCollection *) list->data;
00995 
00996         if (!strcmp(collection->collection_name, collection_name)) {
00997             
00998             GList *list2;
00999             gboolean restart_flag = FALSE;
01000 
01001             
01002             if (!subject) {
01003                 
01004                 restart_flag = TRUE;
01005             }
01006 
01007             list2 = collection->files;
01008             while (list2) {
01009                 NNTPFile *file = list2->data;
01010 
01011                 if (restart_flag || !strcmp(file->subject, subject)) {
01012                     
01013                     GList *list3;
01014                     gboolean file_is_downloading = FALSE;
01015                     int filesize_remaining_active = 0;
01016 
01017                     list3 = file->parts;
01018                     while (list3) {
01019                         NNTPPart *part = list3->data;
01020 
01021                         if (part->now_downloading) {
01022                             list3 = g_list_next(list3);
01023                             file_is_downloading = TRUE;
01024                             filesize_remaining_active += part->size;
01025                             continue;
01026                         }
01027 
01028                         part->downloaded = FALSE;
01029                         part->now_downloading = FALSE;
01030 
01031                         memset(part->servers_already_tried, 0, sizeof(part->servers_already_tried));
01032 
01033                         list3 = g_list_next(list3);
01034                     }
01035 
01036                     file->file_is_downloading = file_is_downloading;
01037                     file->file_is_downloaded = FALSE;
01038                     file->now_decoding = FALSE;
01039                     file->num_parts_downloaded = 0;
01040                     file->num_parts_failed = 0;
01041 
01042                     collection->total_size_remaining += file->file_size - file->file_size_remaining - filesize_remaining_active;
01043                     file->file_size_remaining = file->file_size - filesize_remaining_active;
01044 
01045                     imported_funcs_task_state_changed(collection_name, file->subject, file->status, TASK_STATE_WAITING_FOR_DOWNLOAD);
01046                     file->status = TASK_STATE_WAITING_FOR_DOWNLOAD;
01047 
01048                     if (!restart_flag) {
01049                         g_static_mutex_unlock(&schedular->mutex);
01050 
01051                         return TRUE;
01052                     }
01053                 }
01054 
01055                 list2 = g_list_next(list2);
01056             }
01057 
01058             if (restart_flag) {
01059                 g_static_mutex_unlock(&schedular->mutex);
01060                 return TRUE;
01061             }
01062         }
01063 
01064         list = g_list_next(list);
01065     }
01066 
01067     g_static_mutex_unlock(&schedular->mutex);
01068 
01069     if (errmsg) {
01070         *errmsg = g_strdup_printf(_("Subject '%s' was not found in collection '%s'"), subject, collection_name);
01071     }
01072 
01073     return FALSE;
01074 }
01075 
01076 gboolean
01077 nntpgrab_plugin_schedular_save_queue(char **errmsg)
01078 {
01079     return schedular_plugin_save_queue(errmsg);
01080 }
01081 
01082 
01083 void
01084 nntpgrab_plugin_schedular_foreach_task(GFunc foreach_func, gpointer data)
01085 {
01086     g_assert(schedular != NULL);
01087 
01088     g_static_mutex_lock(&schedular->mutex);
01089 
01090     g_list_foreach(schedular->queue, foreach_func, data);
01091 
01092     g_static_mutex_unlock(&schedular->mutex);
01093 }
01094 
01095 gboolean
01096 nntpgrab_plugin_schedular_move_task(const char *collection_name_src, const char *subject_src, const char *collection_name_dest, int position_dest)
01097 {
01098     GList *list;
01099     NNTPCollection *collection_src = NULL;
01100     NNTPCollection *collection_dest = NULL;
01101     NNTPFile *file = NULL;
01102     gboolean found = FALSE;
01103     int old_position = 0;
01104 
01105     g_assert(schedular != NULL);
01106 
01107     g_return_val_if_fail(collection_name_src != NULL, FALSE);
01108     g_return_val_if_fail(subject_src != NULL, FALSE);
01109     g_return_val_if_fail(collection_name_dest != NULL, FALSE);
01110     g_return_val_if_fail(position_dest >= -1, FALSE);
01111 
01112     
01113     g_return_val_if_fail(!strcmp(collection_name_src, collection_name_dest), FALSE);
01114 
01115     g_static_mutex_lock(&schedular->mutex);
01116 
01117     
01118     list = schedular->queue;
01119     while (list && !found) {
01120         collection_src = (NNTPCollection *) list->data;
01121 
01122         if (!strcmp(collection_src->collection_name, collection_name_src)) {
01123             GList *list2;
01124 
01125             list2 = collection_src->files;
01126             while (list2) {
01127                 file = list2->data;
01128 
01129                 if (!strcmp(file->subject, subject_src)) {
01130                     found = TRUE;
01131                     break;
01132                 }
01133 
01134                 file = NULL;
01135                 list2 = g_list_next(list2);
01136                 old_position++;
01137             }
01138         }
01139 
01140         list = g_list_next(list);
01141     }
01142 
01143     if (!found) {
01144         g_static_mutex_unlock(&schedular->mutex);
01145         return FALSE;
01146     }
01147 
01148     
01149     if (!strcmp(collection_name_src, collection_name_dest)) {
01150         collection_dest = collection_src;
01151     } else {
01152         
01153         list = schedular->queue;
01154         found = FALSE;
01155         while (list && !found) {
01156             collection_dest = (NNTPCollection *) list->data;
01157 
01158             if (!strcmp(collection_dest->collection_name, collection_name_src)) {
01159                 found = TRUE;
01160             }
01161 
01162             list = g_list_next(list);
01163         }
01164 
01165         if (!found) {
01166             g_static_mutex_unlock(&schedular->mutex);
01167             return FALSE;
01168         }
01169     }
01170 
01171     
01172     collection_src->files = g_list_remove(collection_src->files, file);
01173     collection_dest->files = g_list_insert(collection_dest->files, file, position_dest);
01174 
01175 #if 0
01176     int i = 0;
01177 
01178     list = collection_dest->files;
01179     while (list) {
01180         NNTPFile *file = (NNTPFile *) list->data;
01181 
01182         g_print("subject %i = %s\n", i, file->subject);
01183         i++;
01184         list = g_list_next(list);
01185     }
01186 #endif
01187 
01188     g_static_mutex_unlock(&schedular->mutex);
01189 
01190     imported_funcs_emit_task_moved(collection_name_src, subject_src, collection_name_dest, old_position, position_dest);
01191 
01192     return TRUE;
01193 }
01194 
01195 gboolean
01196 nntpgrab_plugin_schedular_move_collection(const char *collection_name, int new_position)
01197 {
01198     GList *list;
01199     NNTPCollection *collection_src = NULL;
01200     gboolean found = FALSE;
01201     int orig_position = 0;
01202 
01203     g_assert(schedular != NULL);
01204 
01205     g_static_mutex_lock(&schedular->mutex);
01206 
01207     
01208     list = schedular->queue;
01209     while (list) {
01210         collection_src = (NNTPCollection *) list->data;
01211 
01212         if (!strcmp(collection_src->collection_name, collection_name)) {
01213             found = TRUE;
01214             schedular->queue = g_list_remove(schedular->queue, collection_src);
01215             schedular->queue = g_list_insert(schedular->queue, collection_src, new_position);
01216             break;
01217         }
01218 
01219         orig_position++;
01220         list = g_list_next(list);
01221     }
01222 
01223 #if 0
01224     int i = 0;
01225 
01226     list = schedular->queue;
01227     while (list) {
01228         collection_src = (NNTPCollection *) list->data;
01229 
01230         g_print("collection %i = %s\n", i, collection_src->collection_name);
01231         i++;
01232         list = g_list_next(list);
01233     }
01234 #endif
01235 
01236     g_static_mutex_unlock(&schedular->mutex);
01237 
01238     if (found) {
01239         imported_funcs_emit_collection_moved(collection_name, orig_position, new_position);
01240     }
01241 
01242     return found;
01243 }
01244 
01245 gboolean
01246 schedular_plugin_save_queue(char **errmsg)
01247 {
01248     g_assert(schedular != NULL);
01249 
01250     g_static_mutex_lock(&schedular->mutex);
01251 
01252     if (!save_download_queue(schedular->queue, errmsg)) {
01253         g_static_mutex_unlock(&schedular->mutex);
01254         return FALSE;
01255     }
01256 
01257     g_static_mutex_unlock(&schedular->mutex);
01258 
01259     return TRUE;
01260 }
01261