这里先写下主要的业务代码,一些库代码稍后补充上
/** * Feed新闻个性化推送 */#include "push_service_news.h"/** * 保证单进程运行 */void single_process() { lock_fd = open("logs/lock", O_CREAT | O_RDWR | O_TRUNC, 00664); if (push_trylock_fd(lock_fd) == FAILURE) { push_sys_notice("push_service_news already exists\n"); close(lock_fd); exit(0); } else { push_sys_notice("push_service_news lock success"); }}/** * 线程数据回收函数,单线程结束的时候,系统自动调用 */void *push_thread_data_del(void *data) { if (data) { free(data); push_sys_notice("thread data free success"); } else { push_sys_notice("thread data free fail"); }}/** * 初始化 * 连接mysql、线程池、日志打印目录 */void init_main(int argc, char **argv) { a_tpool = push_init_tpool(PUSH_TPOOL_THREAD_COUNT, PUSH_TPOOL_MAX_TASK); if (!a_tpool) { push_sys_error("push_init_tpool error"); exit(1); } ThreadData_create(push_thread_data, push_thread_data_del);}/** * 获取待执行的任务 */void get_task() { int ret = push_mysql_connect(PUSH_DB_HOST, PUSH_DB_USER, PUSH_DB_PASSWD, PUSH_DB_PORT, PUSH_DB_DBNAME); if (ret) { push_sys_error("mysql_connect error"); exit(1); } ret = push_dao_newstask_getone(&a_newstask); if (ret == FAILURE) { push_sys_error("push_dao_newstask_getone error"); exit(1); } if (ret == 0) { push_sys_notice("have no task"); exit(1); } task_id = atol(a_newstask.info[0][F_news_task_id]); if (!task_id) { push_sys_error("task_id error,:%d", task_id); exit(1); } ret = push_dao_newstask_update_status(task_id, STATUS_DOING); if (!ret) { push_sys_error("update status doing error:%d", task_id); exit(1); } push_mysql_close();}/** * 初始化一些基础路径 */void init_path(int platform) { //重新初始化日志打印目录 char log_path[PUSH_MAXLEN_PATH] = { 0}; snprintf(log_path, sizeof (log_path), "logs/%u", task_id); push_init_logger(log_path); //base_path snprintf(base_path, sizeof (base_path), "logs/%u/", task_id); //data_path snprintf(data_path, sizeof (data_path), "%s%s", base_path, "data/"); char *ptr = platform == PUSH_PLATFORM_ANDROID ? "android" : "iphone"; //cuids_path snprintf(cuids_path, sizeof (cuids_path), "%s%s_cuids", data_path, ptr); //bigdata_path snprintf(bigdata_path, sizeof (bigdata_path), "%s%s_bigdata", data_path, ptr); //mkdir push_mkdir(data_path); push_sys_notice("cuids_path[%s] bigdata_path[%s]", cuids_path, bigdata_path); /** * 记录一下task的详细信息 */ push_sys_notice("task_name[%s]", a_newstask.info[0][F_news_task_task_name]); push_sys_notice("android_cuid_files[%s]", a_newstask.info[0][F_news_task_android_cuid_files]); push_sys_notice("iphone_cuid_files[%s]", a_newstask.info[0][F_news_task_iphone_cuid_files]); push_sys_notice("platform[%s]", a_newstask.info[0][F_news_task_platform]);}/** * 下载Ftp文件 * @notice 此函数中不能使用strtok会造成iPhone只解析一个 * @todo 一篇文章重复推送 * @param src_file * @param platform */void download_files(char *src_file) { char *ptr = NULL, *save_ptr = NULL; char wget_cmd[PUSH_LEN_256] = { 0}; char uniq_cmd[PUSH_LEN_256] = { 0}; ptr = strtok_r(src_file, ",", &save_ptr); int num = 0, ret = 0; //下载所有cuid的文件 while (ptr) { snprintf(wget_cmd, sizeof (wget_cmd), "wget -c -O %s_%d %s >> %swget.log 2>&1", cuids_path, num, ptr, base_path); push_sys_notice("wget_cmd start"); ret = system(wget_cmd); push_sys_notice("wget_cmd end:[%s]", wget_cmd); if (ret) { push_sys_error("wget file error:[%s] [%s]", strerror(errno), wget_cmd); } ptr = strtok_r(NULL, ",", &save_ptr); num++; } /** * 对文件进行去重 * ① 如果选择的是一个tag则不需要处理了,针对全量用户效果是显著的 */ if (num <= 1) { snprintf(uniq_cmd, sizeof (uniq_cmd), "cat %s_* > %s", cuids_path, bigdata_path); } else { snprintf(uniq_cmd, sizeof (uniq_cmd), "sort %s_* -u -T /home/work/tmp/sort_tmp > %s", cuids_path, bigdata_path); } push_sys_notice("uniq_cmd start"); system(uniq_cmd); push_sys_notice("uniq_cmd end:[%s]", uniq_cmd);}/** * 工作线程函数 * @param arg */void worker(void *arg) { /*-------------------------------qps控制ST---------------------------------*/ push_thread_data_t *thread_data = (push_thread_data_t *) ThreadData_get(push_thread_data); //第一次初始化 if (!thread_data) { thread_data = (push_thread_data_t *) calloc(1, sizeof (push_thread_data_t)); if (!thread_data) { push_sys_error("push_thread_data calloc error:%s", strerror(errno)); pthread_exit(NULL); } thread_data->count = 0; thread_data->timestamp = push_timestamp(); ThreadData_set(push_thread_data, thread_data); } while (1) { int now_timestamp = push_timestamp(); if (now_timestamp == thread_data->timestamp) { //触发了qps限制 if (thread_data->count >= PUSH_EVERY_THREAD_QPS) { push_sys_notice("qps limit: %d >= %d", thread_data->count, PUSH_EVERY_THREAD_QPS); usleep(10000); continue; } else { //没有触发qps限制 thread_data->count++; push_sys_notice("qps:%d", thread_data->count); break; } } else { //时间不相等,说明肯定没有触发qps限制 thread_data->count = 1; thread_data->timestamp = now_timestamp; break; } } /*-------------------------------qps控制SE----------------------------------*/ //工作流程 push_news_worker_param_t *param = (push_news_worker_param_t *) arg; char *response = push_mapi_news_batch(param->platform, task_id, param->cuids, &a_newstask); if (!response) { push_error("response error cuids[%s]", param->cuids); } else if (strstr(response, "{\"errno\":0")) { push_notice("send success:response[%s]", response); } else { push_error("send fail:response[%s] cuids[%s]", response, param->cuids); } free(param);}/** * 读取文件,向线程池添加任务 */int read_file(int platform) { FILE *fp = fopen(bigdata_path, "r"); if (!fp) { push_sys_error("open %s error:%s", bigdata_path, strerror(errno)); return FAILURE; } char line_data[PUSH_LEN_256] = { 0}; char cuid_data[PUSH_LEN_256] = { 0}; int count; push_news_worker_param_t *worker_param; for (count = 0; !feof(fp) && fgets(line_data, sizeof (line_data), fp) != NULL; count++) { if (count % PUSH_ONCE_CUID_COUNT == 0) { worker_param = calloc(1, sizeof (push_news_worker_param_t)); if (!worker_param) { push_sys_error("calloc worker_param error:%s", strerror(errno)); continue; } worker_param->platform = platform; } sscanf(line_data, "%[^\n]", cuid_data); strncat(worker_param->cuids, cuid_data, PUSH_MAX_CUID_LEN); strncat(worker_param->cuids, ",", 1); if (count % PUSH_ONCE_CUID_COUNT == (PUSH_ONCE_CUID_COUNT - 1)) { if (push_tpool_add_task(a_tpool, worker, worker_param)) { push_sys_error("bigdata_path[%s] tpool add task error", bigdata_path); } else { push_sys_notice("bigdata_path[%s] tpool add task success", bigdata_path); } } } //不是整数的需要特殊处理的 if ((count % PUSH_ONCE_CUID_COUNT != (PUSH_ONCE_CUID_COUNT - 1)) && (count % PUSH_ONCE_CUID_COUNT != 0)) { if (push_tpool_add_task(a_tpool, worker, worker_param)) { push_sys_error("bigdata_path[%s] tpool add task error", bigdata_path); } else { push_sys_notice("bigdata_path[%s] tpool add task success", bigdata_path); } } fclose(fp); return count;}/** * 更新数据库数据 */void update_mysql() { if (task_id == 0) { return; } push_sys_notice("---android_send_count[%d]---", android_send_count); push_sys_notice("---iphone_send_count[%d]---", iphone_send_count); int ret = push_mysql_connect(PUSH_DB_HOST, PUSH_DB_USER, PUSH_DB_PASSWD, PUSH_DB_PORT, PUSH_DB_DBNAME); if (ret) { push_sys_error("mysql_connect error"); return; } int affect; affect = push_dao_newstask_update_send_count(task_id, android_send_count, iphone_send_count); if (!affect) { push_sys_error("update send_count error"); } affect = push_dao_newstask_update_status(task_id, STATUS_SUSS); if (affect > 0) { push_sys_error("update status success, status[%d]", STATUS_SUSS); } push_mysql_close();}/** * 收尾工作 * 关闭mysql连接、销毁线程池、释放查询结果集 */void end_main(void) { push_tpool_destroy(a_tpool); push_dao_newstask_free_result(&a_newstask); update_mysql(); push_unlock_fd(lock_fd); close(lock_fd); ThreadData_delete(push_thread_data); push_sys_notice("--------------------task:%d end----------------\n", task_id);}/** * 主函数 */int main(int argc, char **argv) { push_init_logger("logs"); single_process(); atexit(end_main); /** *@todo应该先获取任务在实例化线程池 */ init_main(argc, argv); get_task(); /** * 分平台去处理各自的逻辑 */ platform = atoi(a_newstask.info[0][F_news_task_platform]); //Android平台 if (platform == PUSH_PLATFORM_ANDROID || platform == PUSH_PLATFORM_ALL) { init_path(PUSH_PLATFORM_ANDROID); download_files(a_newstask.info[0][F_news_task_android_cuid_files]); android_send_count = read_file(PUSH_PLATFORM_ANDROID); } //iphone平台 if (platform == PUSH_PLATFORM_IPHONE || platform == PUSH_PLATFORM_ALL) { init_path(PUSH_PLATFORM_IPHONE); download_files(a_newstask.info[0][F_news_task_iphone_cuid_files]); iphone_send_count = read_file(PUSH_PLATFORM_IPHONE); } //错误的平台设置 if (platform != PUSH_PLATFORM_ANDROID && platform != PUSH_PLATFORM_ALL && platform != PUSH_PLATFORM_IPHONE) { push_sys_error("platform error:%d", platform); exit(1); }}