| Blazer's profileBlazer的共享空间BlogNetwork | Help |
Blazer的共享空间 |
|||||
|
|
May 20 epoll在大家苦苦的为在线人数的增长而导致的系统资源吃紧上的问题正在发愁的时候,Linux 2.6内核中提供的System Epoll为我们提供了一套完美的解决方案。传统的select以及poll的效率会因为在线人数的线形递增而导致呈二次乃至三次方的下降,这些直接导致了网络服务器可以支持的人数有了个比较明显的限制。 自从Linux提供了/dev/epoll的设备以及后来2.6内核中对/dev/epoll设备的访问的封装(System Epoll)之后,这种现象得到了大大的缓解,如果说几个月前,大家还对epoll不熟悉,那么现在来说的话,epoll的应用已经得到了大范围的普及。 那么究竟如何来使用epoll呢?其实非常简单。 通过在包含一个头文件#include <sys epoll.h="">以及几个简单的API将可以大大的提高你的网络服务器的支持人数。 首先通过create_epoll(int maxfds)来创建一个epoll的句柄,其中maxfds为你epoll所支持的最大句柄数。这个函数会返回一个新的epoll句柄,之后的所有操作将通过这个句柄来进行操作。在用完之后,记得用close()来关闭这个创建出来的epoll句柄。 之后在你的网络主循环里面,每一帧的调用epoll_wait(int epfd, epoll_event events, int max events, int timeout)来查询所有的网络接口,看哪一个可以读,哪一个可以写了。基本的语法为: nfds = epoll_wait(kdpfd, events, maxevents, -1); 其中kdpfd为用epoll_create创建之后的句柄,events是一个epoll_event*的指针,当epoll_wait这个函数操作成功之后,epoll_events里面将储存所有的读写事件。max_events是当前需要监听的所有socket句柄数。最后一个timeout是epoll_wait的超时,为0的时候表示马上返回,为-1的时候表示一直等下去,直到有事件范围,为任意正整数的时候表示等这么长的时间,如果一直没有事件,则范围。一般如果网络主循环是单独的线程的话,可以用-1来等,这样可以保证一些效率,如果是和主逻辑在同一个线程的话,则可以用0来保证主循环的效率。 epoll_wait范围之后应该是一个循环,遍利所有的事件: for(n = 0; n < nfds; ++n) { if(events[n].data.fd == listener) { //如果是主socket的事件的话,则表示有新连接进入了,进行新连接的处理。 client = accept(listener, (struct sockaddr *) &local, &addrlen); if(client < 0){ perror("accept"); continue; } setnonblocking(client); // 将新连接置于非阻塞模式 ev.events = EPOLLIN | EPOLLET; // 并且将新连接也加入EPOLL的监听队列。 注意,这里的参数EPOLLIN | EPOLLET并没有设置对写socket的监听,如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET ev.data.fd = client; if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) { // 设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面,这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。 fprintf(stderr, "epoll set insertion error: fd=%d0, client); return -1; } } else // 如果不是主socket的事件的话,则代表是一个用户socket的事件,则来处理这个用户socket的事情,比如说read(fd,xxx)之类的,或者一些其他的处理。 do_use_fd(events[n].data.fd); } 对,epoll的操作就这么简单,总共不过4个API:epoll_create, epoll_ctl, epoll_wait和close。 fcntl函数f c n t l函数可以改变已经打开文件的性质。
int fcntl(intf i l e d e s, int c m d,.../* inta rg * / ) ;
返回:若成功则依赖于c m d(见下),若出错为- 1 f c n t l函数有五种功能:
• 复制一个现存的描述符(c m d=F D U P F D)。 • 获得/设置文件描述符标记(c m d = F G E T F D或F S E T F D)。 • 获得/设置文件状态标志(c m d = F G E T F L或F S E T F L)。 • 获得/设置异步I / O有权(c m d = F G E TO W N或F S E TO W N)。 • 获得/设置记录锁(c m d = F G E T L K , F S E T L K或F S E T L K W)。 我们先说明这十种命令值中的前七种( 1 2 . 3节说明后三种,它们都与记录锁有关)我们将涉 及与进程表项中各文件描述符相关联的文件描述符标志,以及每个文件表项中的文件状态标志, 见图3 - 1。 • F_DUPFD 复制文件描述符f i l e d e s,新文件描述符作为函数值返回。它是尚未打开的各 描述符中大于或等于第三个参数值(取为整型值)中各值的最小值。新描述符与filedes 共享同 一文件表项(见图3 - 3)。但是,新描述符有它自己的一套文件描述符标志,其F D C L O E X E C 文件描述符标志则被清除(这表示该描述符在exec 时仍保持开放,我们将在第8章对此进行 讨论)。 • F_GETFD 对应于filedes 的文件描述符标志作为函数值返回。当前只定义了一个文件描
述符标志F D C L O E X E C。 • F_SETFD 对于filedes 设置文件描述符标志。新标志值按第三个参数(取为整型值)设置。 应当了解很多现存的涉及文件描述符标志的程序并不使用常数F D C L O E X E C,而是将此 标志设置为0 (系统默认,在e x e c时不关闭)或1 (在e x e c时关闭)。 • F_GETFL 对应于filedes 的文件状态标志作为函数值返回。在说明o p e n函数时,已说明 了文件状态标志。它们列于表3 - 2中。 表3-2 对于f c n t l的文件状态标志 文件状态标志说明 O R D O N L Y 只读打开 O W R O N L Y 只写打开 O R D W R 读/写打开 O A P P E N D 写时都添加至文件尾 O N O N B L O C K 非阻塞方式 O S Y N C 等待写完成 O A S Y N C 异步I / O(仅4 . 3 + B S D) 不幸的是,三个存取方式标志( O R D O N LY, O W R O N LY,以及O R D W R )并不各占1位。(正 如前述,这三种标志的值各是0、1和2,由于历史原因。这三种值互斥—一个文件只能有这 三种值之一。)因此首先必须用屏蔽字O A C C M O D E取得存取方式位,然后将结果与这三种值 相比较。 • F_SETFL 将文件状态标志设置为第三个参数的值(取为整型值)。可以更改的几个标志是: O A P P E N D,O N O N B L O C K,O S Y N C和O A S Y N C。 • F_GETOWN 取当前接收S I G I O和S I G U R G信号的进程I D或进程组I D。1 2 . 6 . 2节将论述这 两种4 . 3 + B S D异步I / O信号。 • F_SETOWN 设置接收S I G I O和S I G U R G信号的进程I D或进程组I D。正的a rg指定一个进 程I D,负的a rg表示等于a rg绝对值的一个进程组I D。 f c n t l的返回值与命令有关。如果出错,所有命令都返回- 1,如果成功则返回某个其他值。 下列三个命令有特定返回值:F_DUPFD,F_GETFD, F_GETFL以及F G E TO W N。第一个返回新 的文件描述符,第二个返回相应标志,最后一个返回一个正的进程I D或负的进程组I D。 vi在vi中设置快捷键
在当前用户目录下创建.vimrc文件,写入我们自定义的快捷键,格式如下: (1)imap <F12> <?php?> <Esc>1Fpa<Enter><Enter><Esc>ki 解释: imap (呵呵应该是标识符吧,有谁告诉我一下) <F12> 就是自定义的快捷键。 <?php?> 就是我们在插入状态下,点击F12时,往vi里面输入的字符。 <Esc> 表示点击Esc键,确定进入命令模式 1Fp 表示从光标处向左移动到第一个p处 a 插入模式 <Enter><Enter> 两个回车咯 <Esc>ki 进入命令模式,k表示光标向上移动一行;i表示插入。 结果就是:
<?php ?>
(2)imap <F1> {} <Esc>1F{a<Enter><Enter><Esc>ki<tab> 效果如下: { ... } (3)imap <F2> {} <Esc>1F{i<tab><Esc>la<Enter><Enter><tab><Esc>ki<tab><tab> 效果如下: { ... } To 14, 我觉得不能在insert mode下使用:set命令。参考楼上几位朋友的提示,我又查了一下vi设置方面的资料。
1,查看当前vi设置,使用:set all命令,它显示的是当前vi的是设置,下面是我的VI设置:
noaltwerase noextended mesg report=5 term="xterm" noautoindent filec="" nomodeline noruler noterse autoprint flash msgcat="./" scroll=11 notildeop noautowrite hardtabs=0 noprint="" nosearchincr timeout backup="" noiclower nonumber nosecure nottywerase nobeautify noignorecase nooctal shiftwidth=8 noverbose cdpath=":" keytime=6 open noshowmatch warn cedit="" noleftright optimize showmode window=23 columns=80 lines=24 path="" sidescroll=16 nowindowname nocomment nolisp print="" noslowopen wraplen=0 noedcompatible nolist prompt nosourceany wrapmargin=0 escapetime=3 lock noreadonly tabstop=8 wrapscan noerrorbells magic noredraw taglength=0 nowriteany noexrc matchtime=7 remap tags="tags" directory="/tmp" paragraphs="IPLPPPQPP LIpplpipbp" recdir="/var/tmp/vi.recover" sections="NHSHH HUnhsh" shell="/bin/bash" shellmeta="~{[*?$`'"\" Press any key to continue [: to enter more ex commands]: 这些参数有两种赋值方法,一是,通过=赋值,二是布尔赋值。比如:
:set tabstop=8 #设置tab键宽度为8. :set noshowmode #不显示当前模式 :set showmode #显示当前模式 2, 在vi启动是进行设置,有两种方法:
a. 在当前的用户目录,创建.exrc文件,写入我们想要的命令,比如 set showmode set tabstop=4 b. 设置环境变量EXINIT(可在.bashrc, .profile等文件中配置),比如 EXINIT='set showmode | tapstop=8' export EXINIT 2007-04-18 补充:
.vimrc 比 .exrc 的优先级高。在.vimrc中一样可以设置第二部分的内容。 命令 光标移动
h或^h 向左移一个字符 j或^j或^n 向下移一行 k或^p 向上移一行 l或空格 向右移一个字符 G 移到文件的最后一行 nG 移到文件的第n行 w 移到下一个字的开头 W 移到下一个字的开头,忽略标点符号 b 移到前一个字的开头 B 移到前一个字的开头,忽略标点符号 L 移到屏幕的最后一行 M 移到屏幕的中间一行 H 移到屏幕的第一行 e 移到下一个字的结尾 E 移到下一个字的结尾,忽略标点符号 ( 移到句子的开头 ) 移到句子的结尾 { 移到段落的开头 } 移到下一个段落的开头 0或| 移到当前行的第一列 n| 移到当前行的第n列 ^ 移到当前行的第一个非空字符 $ 移到当前行的最后一个字符 +或return 移到下一行的第一个字符 - 移到前一行的第一个非空字符 在vi中添加文本 命令 插入动作
a 在光标后插入文本 A 在当前行插入文本 i 在光标前插入文本 I 在当前行前插入文本 o 在当前行的下边插入新行 O 在当前行的上边插入新行 :r file 读入文件file内容,并插在当前行后 :nr file 读入文件file内容,并插在第n行后 escape 回到命令模式 ^v char 插入时忽略char的指定意义,这是为了插入特殊字符 在vi中删除文本 命令 删除操作
x 删除光标处的字符,可以在x前加上需要删除的字符数目 nx 从当前光标处往后删除n个字符 X 删除光标前的字符,可以在X前加上需要删除的字符数目 nX 从当前光标处往前删除n个字符 dw 删至下一个字的开头 ndw 从当前光标处往后删除n个字 dG 删除行,直到文件结束 dd 删除整行 ndd 从当前行开始往后删除 db 删除光标前面的字 ndb 从当前行开始往前删除n字 :n,md 从第m行开始往前删除n行 d或d$ 从光标处删除到行尾 dcursor_command 删除至光标命令处,如dG将从当产胆行删除至文件的末尾 ^h或backspace 插入时,删除前面的字符 ^w 插入时,删除前面的字 修改vi文本 每个命令前面的数字表示该命令重复的次数
命令 替换操作 rchar 用char替换当前字符 R text escape 用text替换当前字符直到换下Esc键 stext escape 用text代替当前字符 S或cctext escape 用text代替整行 cwtext escape 将当前字改为text Ctext escape 将当前行余下的改为text cG escape 修改至文件的末尾 ccursor_cmd text escape 从当前位置处到光标命令位置处都改为text 在vi中查找与替换 命令 查找与替换操作
/text 在文件中向前查找text ?text 在文件中向后查找text n 在同一方向重复查找 N 在相反方向重复查找 ftext 在当前行向前查找text Ftext 在当前行向后查找text ttext 在当前行向前查找text,并将光标定位在text的第一个字符 Ttext 在当前行向后查找text,并将光标定位在text的第一个字符 :set ic 查找时忽略大小写 :set noic 查找时对大小写敏感 :s/oldtext/newtext 用newtext替换oldtext :m,ns/oldtext/newtext 在m行通过n,用newtext替换oldtext & 重复最后的:s命令 :g/text1/s/text2/text3 查找包含text1的行,用text3替换text2 :g/text/command 在所有包含text的行运行command所表示的命令 :v/text/command 在所有不包含text的行运行command所表示的命令 在vi中复制文本 命令 复制操作
yy 将当前行的内容放入临时缓冲区 nyy 将n行的内容放入临时缓冲区 p 将临时缓冲区中的文本放入光标后 P 将临时缓冲区中的文本放入光标前 "(a-z)nyy 复制n行放入名字为圆括号内的可命名缓冲区,省略n表示当前行 "(a-z)ndd 删除n行放入名字为圆括号内的可命名缓冲区,省略n表示当前行 "(a-z)p 将名字为圆括号的可命名缓冲区的内容放入当前行后 "(a-z)P 将名字为圆括号的可命名缓冲区的内容放入当前行前 在vi中撤消与重复 命令 撤消操作
u 撤消最后一次修改 U 撤消当前行的所有修改 . 重复最后一次修改 , 以相反的方向重复前面的f、F、t或T查找命令 ; 重复前面的f、F、t或T查找命令 "np 取回最后第n次的删除(缓冲区中存有一定次数的删除内容,一般为9) n 重复前面的/或?查找命令 N 以相反方向重复前面的/或?命令 保存文本和退出vi 命令 保存和/或退出操作
:w 保存文件但不退出vi :w file 将修改保存在file中但不退出vi :wq或ZZ或:x 保存文件并退出vi :q! 不保存文件,退出vi :e! 放弃所有修改,从上次保存文件开始再编辑 vi中的选项 选项 作用
:set all 打印所有选项 :set nooption 关闭option选项 :set nu 每行前打印行号 :set showmode 显示是输入模式还是替换模式 :set noic 查找时忽略大小写 :set list 显示制表符(^I)和行尾符号 :set ts=8 为文本输入设置tab stops :set window=n 设置文本窗口显示n行 vi的状态 选项 作用
:.= 打印当前行的行号 := 打印文件中的行数 ^g 显示文件名、当前的行号、文件的总行数和文件位置的百分比 :l 使用字母"l"来显示许多的特殊字符,如制表符和换行符 在文本中定位段落和放置标记 选项 作用
{ 在第一列插入{来定义一个段落 [[ 回到段落的开头处 ]] 向前移到下一个段落的开头处 m(a-z) 用一个字母来标记当前位置,如用mz表示标记z '(a-z) 将光标移动到指定的标记,如用'z表示移动到z 在vi中连接行 选项 作用
J 将下一行连接到当前行的末尾 nJ 连接后面n行 光标放置与屏幕调整 选项 作用
H 将光标移动到屏幕的顶行 nH 将光标移动到屏幕顶行下的第n行 M 将光标移动到屏幕的中间 L 将光标移动到屏幕的底行 nL 将光标移动到屏幕底行上的第n行 ^e(ctrl+e) 将屏幕上滚一行 ^y 将屏幕下滚一行 ^u 将屏幕上滚半页 ^d 将屏幕下滚半页 ^b 将屏幕上滚一页 ^f 将屏幕下滚一页 ^l 重绘屏幕 z-return 将当前行置为屏幕的顶行 nz-return 将当前行下的第n行置为屏幕的顶行 z. 将当前行置为屏幕的中央 nz. 将当前行上的第n行置为屏幕的中央 z- 将当前行置为屏幕的底行 nz- 将当前行上的第n行置为屏幕的底行 vi中的shell转义命令 选项 作用
:!command 执行shell的command命令,如:!ls :!! 执行前一个shell命令 :r!command 读取command命令的输入并插入,如:r!ls会先执行ls,然后读入内容 :w!command 将当前已编辑文件作为command命令的标准输入并执行command命令,如:w!grep all :cd directory 将当前工作目录更改为directory所表示的目录 :sh 将启动一个子shell,使用^d(ctrl+d)返回vi :so file 在shell程序file中读入和执行命令 vi中的宏与缩写 (避免使用控制键和符号,不要使用字符K、V、g、q、v、*、=和功能键) 选项 作用
:map key command_seq 定义一个键来运行command_seq,如:map e ea,无论什么时候都可以e移到一个字的末尾来追加文本 :map 在状态行显示所有已定义的宏 :umap key 删除该键的宏 :ab string1 string2 定义一个缩写,使得当插入string1时,用string2替换string1。当要插入文本时,键入string1然后按Esc键,系统就插入了string2 :ab 显示所有缩写 :una string 取消string的缩写 在vi中缩进文本 选项 作用
^i(ctrl+i)或tab 插入文本时,插入移动的宽度,移动宽度是事先定义好的 :set ai 打开自动缩进 :set sw=n 将移动宽度设置为n个字符 n<< 使n行都向左移动一个宽度 n>> 使n行都向右移动一个宽度,例如3>>就将接下来的三行每行都向右移动一个移动宽度 cs find c|d|e|f|g|i|s|t name
0 或 s 查找本 C 符号(可以跳过注释) 1 或 g 查找本定义 2 或 d 查找本函数调用的函数 3 或 c 查找调用本函数的函数 4 或 t 查找本字符串 6 或 e 查找本 egrep 模式 7 或 f 查找本文件 8 或 i 查找包含本文件的文件 [root@linux ~]# grep [-acinv] '搜寻字串' filename 参数∶ -a ∶将 binary 档案以 text 档案的方式搜寻资料 -c ∶计算找到 '搜寻字串' 的次数 -i ∶忽略大小写的不同,所以大小写视为相同 -n ∶顺便输出行号 -v ∶反向选择,亦即显示出没有 '搜寻字串' 内容的那一行! 范例∶ 范例一∶将 last 当中,有出现 root 的那一行就取出来;
[root@linux ~]# last | grep 'root' 范例二∶与范例一相反,只要没有 root 的就取出!
[root@linux ~]# last | grep -v 'root' 范例三∶在 last 的输出讯息中,只要有 root 就取出,并且仅取第一栏
[root@linux ~]# last | grep 'root' |cut -d ' ' -f1 # 在取出 root 之后,利用上个指令 cut 的处理,就能够仅取得第一栏棉! GDB调试精粹及使用实例 一:列文件清单 二:执行程序 三:显示数据 print 是gdb的一个功能很强的命令,利用它可以显示被调试的语言中任何有效的表达式。表达式除了包含你程序中的变量外,还可以包含以下内容: 四:断点(breakpoint) 如果该程序是由很多原文件构成的,你可以在各个原文件中设置断点,而不是在当前的原文件中设置断点,其方法如下: 要想设置一个条件断点,可以利用break if命令,如下所示: 从断点继续运行:countinue 命令 1. 显示当前gdb的断点信息: 七.单步执行 九.机器语言工具 十.信号 Handle命令可控制信号的处理,他有两个参数,一个是信号名,另一个是接受到信号时该作什么。几种可能的参数是: 十一. 原文件的搜索 十二.UNIX接口 十三.命令的历史 小结:常用的gdb命令 在 gdb 提示符处键入help,将列出命令的分类,主要的分类有:
******gdb 使用范例************************ -----------------
Linux下的通用线程池创建本文给出了一个通用的线程池框架,该框架将与线程执行相关的任务进行了高层次的抽象,使之与具体的执行任务无关。另外该线程池具有动态伸缩性,它能根据执行任务的轻重自动调整线程池中线程的数量。文章的最后,我们给出一个简单示例程序,通过该示例程序,我们会发现,通过该线程池框架执行多线程任务是多么的简单。 为什么需要线程池 前的大多数网络服务器,包括Web服务器、Email服务器以及数据库服务器等都具有一个共同点,就是单位时间内必须处理数目巨大的连接请求,但处理时间却相对较短。 传统多线程方案中我们采用的服务器模型则是一旦接受到请求之后,即创建一个新的线程,由该线程执行任务。任务执行完毕后,线程退出,这就是是“即时创建,即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。 我们将传统方案中的线程执行过程分为三个过程:T1、T2、T3。 T1:线程创建时间 T2:线程执行时间,包括线程的同步等时间 T3:线程销毁时间 那么我们可以看出,线程本身的开销所占的比例为(T1+T3) / (T1+T2+T3)。如果线程执行的时间很短的话,这比开销可能占到20%-50%左右。如果任务执行时间很频繁的话,这笔开销将是不可忽略的。 除此之外,线程池能够减少创建的线程个数。通常线程池所允许的并发线程是有上界的,如果同时需要并发的线程数超过上界,那么一部分线程将会等待。而传统方案中,如果同时请求数目为2000,那么最坏情况下,系统可能需要产生2000个线程。尽管这不是一个很大的数目,但是也有部分机器可能达不到这种要求。 因此线程池的出现正是着眼于减少线程池本身带来的开销。线程池采用预创建的技术,在应用程序启动之后,将立即创建一定数量的线程(N1),放入空闲队列中。这些线程都是处于阻塞(Suspended)状态,不消耗CPU,但占用较小的内存空间。当任务到来后,缓冲池选择一个空闲线程,把任务传入此线程中运行。当N1个线程都在处理任务后,缓冲池自动创建一定数量的新线程,用于处理更多的任务。在任务执行完毕后线程也不退出,而是继续保持在池中等待下一次的任务。当系统比较空闲时,大部分线程都一直处于暂停状态,线程池自动销毁一部分线程,回收系统资源。 基于这种预创建技术,线程池将线程创建和销毁本身所带来的开销分摊到了各个具体的任务上,执行次数越多,每个任务所分担到的线程本身开销则越小,不过我们另外可能需要考虑进去线程之间同步所带来的开销。 构建线程池框架 一般线程池都必须具备下面几个组成部分: 线程池管理器:用于创建并管理线程池 工作线程: 线程池中实际执行的线程 任务接口: 尽管线程池大多数情况下是用来支持网络服务器,但是我们将线程执行的任务抽象出来,形成任务接口,从而是的线程池与具体的任务无关。 任务队列:线程池的概念具体到实现则可能是队列,链表之类的数据结构,其中保存执行线程。 我们实现的通用线程池框架由五个重要部分组成CThreadManage,CThreadPool,CThread,CJob,CWorkerThread,除此之外框架中还包括线程同步使用的类CThreadMutex和CCondition。 CJob是所有的任务的基类,其提供一个接口Run,所有的任务类都必须从该类继承,同时实现Run方法。该方法中实现具体的任务逻辑。 CThread是Linux中线程的包装,其封装了Linux线程最经常使用的属性和方法,它也是一个抽象类,是所有线程类的基类,具有一个接口Run。 CWorkerThread是实际被调度和执行的线程类,其从CThread继承而来,实现了CThread中的Run方法。 CThreadPool是线程池类,其负责保存线程,释放线程以及调度线程。 CThreadManage是线程池与用户的直接接口,其屏蔽了内部的具体实现。 CThreadMutex用于线程之间的互斥。 CCondition则是条件变量的封装,用于线程之间的同步。 它们的类的继承关系如下图所示: 线程池的时序很简单,如下图所示。CThreadManage直接跟客户端打交道,其接受需要创建的线程初始个数,并接受客户端提交的任务。这儿的任务是具体的非抽象的任务。CThreadManage的内部实际上调用的都是CThreadPool的相关操作。CThreadPool创建具体的线程,并把客户端提交的任务分发给CWorkerThread,CWorkerThread实际执行具体的任务。 理解系统组件 下面我们分开来了解系统中的各个组件。 CThreadManage CThreadManage的功能非常简单,其提供最简单的方法,其类定义如下: class CThreadManage { private: CThreadPool* m_Pool; int m_NumOfThread; protected: public: void SetParallelNum(int num); CThreadManage(); CThreadManage(int num); virtual ~CThreadManage(); void Run(CJob* job,void* jobdata); void TerminateAll(void); }; 其中m_Pool指向实际的线程池;m_NumOfThread是初始创建时候允许创建的并发的线程个数。另外Run和TerminateAll方法也非常简单,只是简单的调用CThreadPool的一些相关方法而已。其具体的实现如下: CThreadManage::CThreadManage(){ m_NumOfThread = 10; m_Pool = new CThreadPool(m_NumOfThread); } CThreadManage::CThreadManage(int num){ m_NumOfThread = num; m_Pool = new CThreadPool(m_NumOfThread); } CThreadManage::~CThreadManage(){ if(NULL != m_Pool) delete m_Pool; } void CThreadManage::SetParallelNum(int num){ m_NumOfThread = num; } void CThreadManage::Run(CJob* job,void* jobdata){ m_Pool->Run(job,jobdata); ce="Courier New" size=2> } void CThreadManage::TerminateAll(void){ m_Pool->TerminateAll(); } CThread CThread 类实现了对Linux中线程操作的封装,它是所有线程的基类,也是一个抽象类,提供了一个抽象接口Run,所有的CThread都必须实现该Run方法。CThread的定义如下所示: class CThread { private: int m_ErrCode; Semaphore m_ThreadSemaphore; //the inner semaphore, which is used to realize unsigned long m_ThreadID; bool m_Detach; //The thread is detached bool m_CreateSuspended; //if suspend after creating char* m_ThreadName; ThreadState m_ThreadState; //the state of the thread protected: void SetErrcode(int errcode){m_ErrCode = errcode;} static void* ThreadFunction(void*); public: CThread(); CThread(bool createsuspended,bool detach); virtual ~CThread(); virtual void Run(void) = 0; void SetThreadState(ThreadState state){m_ThreadState = state;} bool Terminate(void); //Terminate the threa bool Start(void); //Start to execute the thread void Exit(void); bool Wakeup(void); ThreadState GetThreadState(void){return m_ThreadState;} int GetLastError(void){return m_ErrCode;} void SetThreadName(char* thrname){strcpy(m_ThreadName,thrname);} char* GetThreadName(void){return m_ThreadName;} int GetThreadID(void){return m_ThreadID;} bool SetPriority(int priority); int GetPriority(void); int GetConcurrency(void); void SetConcurrency(int num); bool Detach(void); bool Join(void); bool Yield(void); int Self(void); }; 线程的状态可以分为四种,空闲、忙碌、挂起、终止(包括正常退出和非正常退出)。由于目前Linux线程库不支持挂起操作,因此,我们的此处的挂起操作类似于暂停。如果线程创建后不想立即执行任务,那么我们可以将其“暂停”,如果需要运行,则唤醒。有一点必须注意的是,一旦线程开始执行任务,将不能被挂起,其将一直执行任务至完毕。 线程类的相关操作均十分简单。线程的执行入口是从Start()函数开始,其将调用函数ThreadFunction,ThreadFunction再调用实际的Run函数,执行实际的任务。 CThreadPool CThreadPool是线程的承载容器,一般可以将其实现为堆栈、单向队列或者双向队列。在我们的系统中我们使用STL Vector对线程进行保存。CThreadPool的实现代码如下: class CThreadPool { friend class CWorkerThread; private: unsigned int m_MaxNum; //the max thread num that can create at the same time unsigned int m_AvailLow; //The min num of idle thread that shoule kept unsigned int m_AvailHigh; //The max num of idle thread that kept at the same time unsigned int m_AvailNum; //the normal thread num of idle num; unsigned int m_InitNum; //Normal thread num; protected: CWorkerThread* GetIdleThread(void); void AppendToIdleList(CWorkerThread* jobthread); void MoveToBusyList(CWorkerThread* idlethread); void MoveToIdleList(CWorkerThread* busythread); void DeleteIdleThread(int num); void CreateIdleThread(int num); public: CThreadMutex m_BusyMutex; //when visit busy list,use m_BusyMutex to lock and unlock CThreadMutex m_IdleMutex; //when visit idle list,use m_IdleMutex to lock and unlock CThreadMutex m_JobMutex; //when visit job list,use m_JobMutex to lock and unlock CThreadMutex m_VarMutex; CCondition m_BusyCond; //m_BusyCond is used to sync busy thread list CCondition m_IdleCond; //m_IdleCond is used to sync idle thread list CCondition m_IdleJobCond; //m_JobCond is used to sync job list CCondition m_MaxNumCond; vector<CWorkerThread*> m_ThreadList; vector<CWorkerThread*> m_BusyList; //Thread List vector<CWorkerThread*> m_IdleList; //Idle List CThreadPool(); CThreadPool(int initnum); virtual ~CThreadPool(); void SetMaxNum(int maxnum){m_MaxNum = maxnum;} int GetMaxNum(void){return m_MaxNum;} void SetAvailLowNum(int minnum){m_AvailLow = minnum;} int GetAvailLowNum(void){return m_AvailLow;} void SetAvailHighNum(int highnum){m_AvailHigh = highnum;} int GetAvailHighNum(void){return m_AvailHigh;} int GetActualAvailNum(void){return m_AvailNum;} int GetAllNum(void){return m_ThreadList.size();} int GetBusyNum(void){return m_BusyList.size();} void SetInitNum(int initnum){m_InitNum = initnum;} int GetInitNum(void){return m_InitNum;} void TerminateAll(void); void Run(CJob* job,void* jobdata); }; CThreadPool::CThreadPool() { m_MaxNum = 50; m_AvailLow = 5; m_InitNum=m_AvailNum = 10 ; m_AvailHigh = 20; m_BusyList.clear(); m_IdleList.clear(); for(int i=0;i<m_InitNum;i++){ CWorkerThread* thr = new CWorkerThread(); thr->SetThreadPool(this); AppendToIdleList(thr); thr->Start(); } } CThreadPool::CThreadPool(int initnum) { assert(initnum>0 && initnum<=30); m_MaxNum = 30; m_AvailLow = initnum-10>0?initnum-10:3; m_InitNum=m_AvailNum = initnum ; m_AvailHigh = initnum+10; m_BusyList.clear(); m_IdleList.clear(); for(int i=0;i<m_InitNum;i++){ CWorkerThread* thr = new CWorkerThread(); AppendToIdleList(thr); thr->SetThreadPool(this); thr->Start(); //begin the thread,the thread wait for job } } CThreadPool::~CThreadPool() { TerminateAll(); } void CThreadPool::TerminateAll() { for(int i=0;i < m_ThreadList.size();i++) { CWorkerThread* thr = m_ThreadList[i]; thr->Join(); } return; } CWorkerThread* CThreadPool::GetIdleThread(void) { while(m_IdleList.size() ==0 ) m_IdleCond.Wait(); m_IdleMutex.Lock(); if(m_IdleList.size() > 0 ) { CWorkerThread* thr = (CWorkerThread*)m_IdleList.front(); printf("Get Idle thread %dn",thr->GetThreadID()); m_IdleMutex.Unlock(); return thr; } m_IdleMutex.Unlock(); return NULL; } //add an idle thread to idle list void CThreadPool::AppendToIdleList(CWorkerThread* jobthread) { m_IdleMutex.Lock(); m_IdleList. push_back(jobthread); m_ThreadList.push_back(jobthread); m_IdleMutex.Unlock(); } //move and idle thread to busy thread void CThreadPool::MoveToBusyList(CWorkerThread* idlethread) { m_BusyMutex.Lock(); m_BusyList.push_back(idlethread); m_AvailNum--; m_BusyMutex.Unlock(); m_IdleMutex.Lock(); vector<CWorkerThread*>::iterator pos; pos = find(m_IdleList.begin(),m_IdleList.end(),idlethread); if(pos !=m_IdleList.end()) m_IdleList.erase(pos); m_IdleMutex.Unlock(); } void CThreadPool::MoveToIdleList(CWorkerThread* busythread) { m_IdleMutex.Lock(); m_IdleList.push_back(busythread); m_AvailNum++; m_IdleMutex.Unlock(); m_BusyMutex.Lock(); vector<CWorkerThread*>::iterator pos; pos = find(m_BusyList.begin(),m_BusyList.end(),busythread); if(pos!=m_BusyList.end()) m_BusyList.erase(pos); m_BusyMutex.Unlock(); m_IdleCond.Signal(); m_MaxNumCond.Signal(); } //create num idle thread and put them to idlelist void CThreadPool::CreateIdleThread(int num) { for(int i=0;i<num;i++){ CWorkerThread* thr = new CWorkerThread(); thr->SetThreadPool(this); AppendToIdleList(thr); m_VarMutex.Lock(); m_AvailNum++; m_VarMutex.Unlock(); thr->Start(); //begin the thread,the thread wait for job } } void CThreadPool::DeleteIdleThread(int num) { printf("Enter into CThreadPool::DeleteIdleThreadn"); m_IdleMutex.Lock(); printf("Delete Num is %dn",num); for(int i=0;i<num;i++){ CWorkerThread* thr; if(m_IdleList.size() > 0 ){ thr = (CWorkerThread*)m_IdleList.front(); printf("Get Idle thread %dn",thr->GetThreadID()); } vector<CWorkerThread*>::iterator pos; pos = find(m_IdleList.begin(),m_IdleList.end(),thr); if(pos!=m_IdleList.end()) m_IdleList.erase(pos); m_AvailNum--; printf("The idle thread available num:%d n",m_AvailNum); printf("The idlelist num:%d n",m_IdleList.size()); } m_IdleMutex.Unlock(); } void CThreadPool::Run(CJob* job,void* jobdata) { assert(job!=NULL); //if the busy thread num adds to m_MaxNum,so we should wait if(GetBusyNum() == m_MaxNum) m_MaxNumCond.Wait(); if(m_IdleList.size()<m_AvailLow) { if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) CreateIdleThread(m_InitNum-m_IdleList.size()); else CreateIdleThread(m_MaxNum-GetAllNum()); } CWorkerThread* idlethr = GetIdleThread(); if(idlethr !=NULL) { idlethr->m_WorkMutex.Lock(); MoveToBusyList(idlethr); idlethr->SetThreadPool(this); job->SetWorkThread(idlethr); printf("Job is set to thread %d n",idlethr->GetThreadID()); idlethr->SetJob(job,jobdata); } } 在CThreadPool中存在两个链表,一个是空闲链表,一个是忙碌链表。Idle链表中存放所有的空闲进程,当线程执行任务时候,其状态变为忙碌状态,同时从空闲链表中删除,并移至忙碌链表中。在CThreadPool的构造函数中,我们将执行下面的代码: for(int i=0;i<m_InitNum;i++) { CWorkerThread* thr = new CWorkerThread(); AppendToIdleList(thr); thr->SetThreadPool(this); thr->Start(); //begin the thread,the thread wait for job } 在该代码中,我们将创建m_InitNum个线程,创建之后即调用AppendToIdleList放入Idle链表中,由于目前没有任务分发给这些线程,因此线程执行Start后将自己挂起。 事实上,线程池中容纳的线程数目并不是一成不变的,其会根据执行负载进行自动伸缩。为此在CThreadPool中设定四个变量: m_InitNum:处世创建时线程池中的线程的个数。 m_MaxNum:当前线程池中所允许并发存在的线程的最大数目。 m_AvailLow:当前线程池中所允许存在的空闲线程的最小数目,如果空闲数目低于该值,表明负载可能过重,此时有必要增加空闲线程池的数目。实现中我们总是将线程调整为m_InitNum个。 m_AvailHigh:当前线程池中所允许的空闲的线程的最大数目,如果空闲数目高于该值,表明当前负载可能较轻,此时将删除多余的空闲线程,删除后调整数也为m_InitNum个。 m_AvailNum:目前线程池中实际存在的线程的个数,其值介于m_AvailHigh和m_AvailLow之间。如果线程的个数始终维持在m_AvailLow和m_AvailHigh之间,则线程既不需要创建,也不需要删除,保持平衡状态。因此如何设定m_AvailLow和m_AvailHigh的值,使得线程池最大可能的保持平衡态,是线程池设计必须考虑的问题。 线程池在接受到新的任务之后,线程池首先要检查是否有足够的空闲池可用。检查分为三个步骤: (1)检查当前处于忙碌状态的线程是否达到了设定的最大值m_MaxNum,如果达到了,表明目前没有空闲线程可用,而且也不能创建新的线程,因此必须等待直到有线程执行完毕返回到空闲队列中。 (2)如果当前的空闲线程数目小于我们设定的最小的空闲数目m_AvailLow,则我们必须创建新的线程,默认情况下,创建后的线程数目应该为m_InitNum,因此创建的线程数目应该为( 当前空闲线程数与m_InitNum);但是有一种特殊情况必须考虑,就是现有的线程总数加上创建后的线程数可能超过m_MaxNum,因此我们必须对线程的创建区别对待。 if(GetAllNum()+m_InitNum-m_IdleList.size() < m_MaxNum ) CreateIdleThread(m_InitNum-m_IdleList.size()); else CreateIdleThread(m_MaxNum-GetAllNum()); 如果创建后总数不超过m_MaxNum,则创建后的线程为m_InitNum;如果超过了,则只创建( m_MaxNum-当前线程总数 )个。 (3)调用GetIdleThread方法查找空闲线程。如果当前没有空闲线程,则挂起;否则将任务指派给该线程,同时将其移入忙碌队列。 当线程执行完毕后,其会调用MoveToIdleList方法移入空闲链表中,其中还调用m_IdleCond.Signal()方法,唤醒GetIdleThread()中可能阻塞的线程。 CWorkerThread CWorkerThread是CThread的派生类,是事实上的工作线程。在CThreadPool的构造函数中,我们创建了一定数量的CWorkerThread。一旦这些线程创建完毕,我们将调用Start()启动该线程。Start方法最终会调用Run方法。Run方法是个无限循环的过程。在没有接受到实际的任务的时候,m_Job为NULL,此时线程将调用Wait方法进行等待,从而处于挂起状态。一旦线程池将具体的任务分发给该线程,其将被唤醒,从而通知线程从挂起的地方继续执行。CWorkerThread的完整定义如下: class CWorkerThread:public CThread { private: CThreadPool* m_ThreadPool; CJob* m_Job; void* m_JobData; CThreadMutex m_VarMutex; bool m_IsEnd; protected: public: CCondition m_JobCond; CThreadMutex m_WorkMutex; CWorkerThread(); virtual ~CWorkerThread(); void Run(); void SetJob(CJob* job,void* jobdata); CJob* GetJob(void){return m_Job;} void SetThreadPool(CThreadPool* thrpool); CThreadPool* GetThreadPool(void){return m_ThreadPool;} }; CWorkerThread::CWorkerThread() { m_Job = NULL; m_JobData = NULL; m_ThreadPool = NULL; m_IsEnd = false; } CWorkerThread::~CWorkerThread() { if(NULL != m_Job) delete m_Job; if(m_ThreadPool != NULL) delete m_ThreadPool; } void CWorkerThread::Run() { SetThreadState(THREAD_RUNNING); for(;;) { while(m_Job == NULL) m_JobCond.Wait(); m_Job->Run(m_JobData); m_Job->SetWorkThread(NULL); m_Job = NULL; m_ThreadPool->MoveToIdleList(this); if(m_ThreadPool->m_IdleList.size() > m_ThreadPool->GetAvailHighNum()) { m_ThreadPool->DeleteIdleThread(m_ThreadPool->m_IdleList.size()-m_T hreadPool->GetInitNum()); } m_WorkMutex.Unlock(); } } void CWorkerThread::SetJob(CJob* job,void* jobdata) { m_VarMutex.Lock(); m_Job = job; m_JobData = jobdata; job->SetWorkThread(this); m_VarMutex.Unlock(); m_JobCond.Signal(); } void CWorkerThread::SetThreadPool(CThreadPool* thrpool) { m_VarMutex.Lock(); m_ThreadPool = thrpool; &nbs, p; m_VarMutex.Unlock(); } 当线程执行任务之前首先必须判断空闲线程的数目是否低于m_AvailLow,如果低于,则必须创建足够的空闲线程,使其数目达到m_InitNum个,然后将调用MoveToBusyList()移出空闲队列,移入忙碌队列。当任务执行完毕后,其又调用MoveToIdleList()移出忙碌队列,移入空闲队列,等待新的任务。 除了Run方法之外,CWorkerThread中另外一个重要的方法就是SetJob,该方法将实际的任务赋值给线程。当没有任何执行任务即m_Job为NULL的时候,线程将调用m_JobCond.Wait进行等待。一旦Job被赋值给线程,其将调用m_JobCond.Signal方法唤醒该线程。由于m_JobCond属于线程内部的变量,每个线程都维持一个m_JobCond,只有得到任务的线程才被唤醒,没有得到任务的将继续等待。无论一个线程何时被唤醒,其都将从等待的地方继续执行m_Job->Run(m_JobData),这是线程执行实际任务的地方。 在线程执行给定Job期间,我们必须防止另外一个Job又赋给该线程,因此在赋值之前,通过m_VarMutex进行锁定, Job执行期间,其于的Job将不能关联到该线程;任务执行完毕,我们调用m_VarMutex.Unlock()进行解锁,此时,线程又可以接受新的执行任务。 在线程执行任务结束后返回空闲队列前,我们还需要判断当前空闲队列中的线程是否高于m_AvailHigh个。如果超过m_AvailHigh,则必须从其中删除(m_ThreadPool->m_IdleList.size()-m_ThreadPool->GetInitNum())个线程,使线程数目保持在m_InitNum个。 CJob CJob类相对简单,其封装了任务的基本的属性和方法,其中最重要的是Run方法,代码如下: class CJob { private: int m_JobNo; //The num was assigned to the job char* m_JobName; //The job name CThread *m_pWorkThread; //The thread associated with the job public: CJob( void ); virtual ~CJob(); int GetJobNo(void) const { return m_JobNo; } void SetJobNo(int jobno){ m_JobNo = jobno;} char* GetJobName(void) const { return m_JobName; } void SetJobName(char* jobname); CThread *GetWorkThread(void){ return m_pWorkThread; } void SetWorkThread ( CThread *pWorkThread ){ m_pWorkThread = pWorkThread; } virtual void Run ( void *ptr ) = 0; }; CJob::CJob(void) :m_pWorkThread(NULL) ,m_JobNo(0) ,m_JobName(NULL) { } CJob::~CJob(){ if(NULL != m_JobName) free(m_JobName); } void CJob::SetJobName(char* jobname) { if(NULL !=m_JobName) { free(m_JobName); m_JobName = NULL; } if(NULL !=jobname) { m_JobName = (char*)malloc(strlen(jobname)+1); strcpy(m_JobName,jobname); } } 线程池使用示例 至此我们给出了一个简单的与具体任务无关的线程池框架。使用该框架非常的简单,我们所需要的做的就是派生CJob类,将需要完成的任务实现在Run方法中? H缓蠼?FONT face="Courier New">Job交由CThreadManage去执行。 下面我们给出一个简单的示例程序 class CXJob:public CJob { public: CXJob(){i=0;} ~CXJob(){} void Run(void* jobdata) { printf("The Job comes from CXJOBn"); sleep(2); } }; class CYJob:public CJob { public: CYJob(){i=0;} ~CYJob(){} void Run(void* jobdata) { printf("The Job comes from CYJobn"); } }; main() { CThreadManage* manage = new CThreadManage(10); for(int i=0;i<40;i++) { CXJob* job = new CXJob(); manage->Run(job,NULL); } sleep(2); CYJob* job = new CYJob(); manage->Run(job,NULL); manage->TerminateAll(); } CXJob和CYJob都是从Job类继承而来,其都实现了Run接口。CXJob只是简单的打印一句”The Job comes from CXJob”,CYJob也只打印”The Job comes from CYJob”,然后均休眠2秒钟。在主程序中我们初始创建10个工作线程。然后分别执行40次CXJob和一次CYJob。 线程池使用后记 线程池适合场合 事实上,线程池并不是万能的。它有其特定的使用场合。线程池致力于减少线程本身的开销对应用所产生的影响,这是有前提的,前提就是线程本身开销与线程执行任务相比不可忽略。如果线程本身的开销相对于线程任务执行开销而言是可以忽略不计的,那么此时线程池所带来的好处是不明显的,比如对于FTP服务器以及Telnet服务器,通常传送文件的时间较长,开销较大,那么此时,我们采用线程池未必是理想的方法,我们可以选择“即时创建,即时销毁”的策略。 总之线程池通常适合下面的几个场合: (1) 单位时间内处理任务频繁而且任务处理时间短 (2) 对实时性要求较高。如果接受到任务后在创建线程,可能满足不了实时要求,因此必须采用线程池进行预创建。 (3) 必须经常面对高突发性事件,比如Web服务器,如果有足球转播,则服务器将产生巨大的冲击。此时如果采取传统方法,则必须不停的大量产生线程,销毁线程。此时采用动态线程池可以避免这种情况的发生。 结束语 本文给出了一个简单的通用的与任务无关的线程池的实现,通过该线程池能够极大的简化Linux下多线程的开发工作。该线程池的进一步完善开发工作还在进行中,希望能够得到你的建议和支持。 |
||||
|
|