ThreadX内核源码分析 - 消息队列

发布时间:2024-03-02 19:01

1、消息队列介绍

ThreadX内核的消息可以多线程收发,每个消息的大小固定;消息队列有一定的大小,超过大小之后,发送消息的线程需要等待消息被取走才能往消息队列里面再次发送消息。

2、消息的接收_tx_queue_receive

消息接收主要检查有没有消息,没有消息就要等待消息或者返回消息队列为空的错误码;

如果消息队列不为空,并且没有发送消息的线程等待消息队列(消息队列不为空),那么直接从消息队列最前面读消息即可;

如果消息队列满了,有线程等待消息队列,那么检查等待消息队列的第一个线程是不是要将消息发送到消息队列最前面(一般消息都是追加的,但是ThreadX内核支持插入消息的最前面),如果是插入到消息队列最前面,那么就从发送消息的线程直接读取消息并唤醒发送消息的线程,否则还得从消息队列最前面开始读消息。

读完一个消息,就可以让一个等待消息队列的线程发送消息,有读消息的函数直接将发送消息的线程的消息数据拷贝到消息队列,然后唤醒该发送消息的线程。

很多内核通用重复代码及原理前面文章都有介绍,在此仅介绍不一样的部分关键代码,详细看代码里面的注释,对着代码行及说明看代码更容易理解。

_tx_queue_receive实现代码如下:

082 UINT  _tx_queue_receive(TX_QUEUE *queue_ptr, VOID *destination_ptr, ULONG wait_option)
083 {
084 
085 TX_INTERRUPT_SAVE_AREA
086 
087 TX_THREAD       *thread_ptr;
088 ULONG           *source;
089 ULONG           *destination;
090 UINT            size;
091 UINT            suspended_count;
092 TX_THREAD       *next_thread;
093 TX_THREAD       *previous_thread;
094 UINT            status;
095 
096 
097     /* Default the status to TX_SUCCESS.  */
098     status =  TX_SUCCESS;
099 
100     /* Disable interrupts to receive message from queue.  */
101     TX_DISABLE
102 
103 #ifdef TX_QUEUE_ENABLE_PERFORMANCE_INFO
104 
105     /* Increment the total messages received counter.  */
106     _tx_queue_performance__messages_received_count++;
107 
108     /* Increment the number of messages received from this queue.  */
109     queue_ptr -> tx_queue_performance_messages_received_count++;
110 
111 #endif
112 
113     /* If trace is enabled, insert this event into the trace buffer.  */
114     TX_TRACE_IN_LINE_INSERT(TX_TRACE_QUEUE_RECEIVE, queue_ptr, TX_POINTER_TO_ULONG_CONVERT(destination_ptr), wait_option, queue_ptr -> tx_queue_enqueued, TX_TRACE_QUEUE_EVENTS)
115 
116     /* Log this kernel call.  */
117     TX_EL_QUEUE_RECEIVE_INSERT
118 
119     /* Pickup the thread suspension count.  */
120     suspended_count =  queue_ptr -> tx_queue_suspended_count; // 等待消息队列线程数
121     
122     /* Determine if there is anything in the queue.  */
123     if (queue_ptr -> tx_queue_enqueued != TX_NO_MESSAGES) // 消息数不为0,有消息
124     {
125 
126         /* Determine if there are any suspensions.  */
127         if (suspended_count == TX_NO_SUSPENSIONS) // 没有线程等待消息
128         {
129 
130             /* There is a message waiting in the queue and there are no suspensi.  */
131             
132             /* Setup source and destination pointers.  */
133             source =       queue_ptr -> tx_queue_read; // tx_queue_read消息数据源地址(消息队列的数据的地址)
134             destination =  TX_VOID_TO_ULONG_POINTER_CONVERT(destination_ptr); // 消息数据目的地址(本次读消息保存数据的地址)
135             size =         queue_ptr -> tx_queue_message_size; // 每个消息的大小,多少个unsigned long大小(消息大小固定,只能按这么大小的消息收发,单位不是byte!!!)
136 
137             /* Copy message. Note that the source and destination pointers are 
138                incremented by the macro.  */
139             TX_QUEUE_MESSAGE_COPY(source, destination, size) // 拷贝size大小消息到destination,拷贝消息的时候,source、destination都在往后移动,拷贝完后,source指向下一个消息(因为消息按消息大小收发的,有消息的话,那么肯定有size大小的数据)
140 
141             /* Determine if we are at the end.  */
142             if (source == queue_ptr -> tx_queue_end) // 当前消息已经到消息的末尾(消息发送到内存的末尾后,接下来的消息从消息内存地址开始的地址存储,一块连续的内存组成一个单向循环链表)
143             {
144 
145                 /* Yes, wrap around to the beginning.  */
146                 source =  queue_ptr -> tx_queue_start; // 下一个消息从tx_queue_start开始
147             }
148         
149             /* Setup the queue read pointer.   */
150             queue_ptr -> tx_queue_read =  source; // 更新tx_queue_read指向下一个消息
151         
152             /* Increase the amount of available storage.  */
153             queue_ptr -> tx_queue_available_storage++; // 消息队列的容量加1(可以一个消息已经被读取,增加消息队列的容量,这里单位是消息个数,不是byte!!!)
154 
155             /* Decrease the enqueued count.  */
156             queue_ptr -> tx_queue_enqueued--; // 消息队列里面的消息个数减1(不包含没有发送到消息队列里面的,可能消息队列已满,还有线程阻塞在发送过程中)
157 
158             /* Restore interrupts.  */
159             TX_RESTORE
160         }
161         else // 有消息并且有消息等待消息队列(此时只可能是消息队列满了,有线程等待消息队列可以发送消息)
162         {
163         
164             /* At this point we know the queue is full.  */
165 
166             /* Pickup thread suspension list head pointer.  */
167             thread_ptr =  queue_ptr -> tx_queue_suspension_list; // 第一个发送消息的阻塞线程
168 
169             /* Now determine if there is a queue front suspension active.   */
170     
171             /* Is the front suspension flag set?  */
172             if (thread_ptr -> tx_thread_suspend_option == TX_TRUE) // 如果tx_thread_suspend_option为TX_TRUE,那么表明thread_ptr是要将消息发送到消息队列最前面,_tx_queue_front_send会设置tx_thread_suspend_option为TX_TRUE,那么直接从该线程读取消息即可
173             {
174         
175                 /* Yes, a queue front suspension is present.  */
176 
177                 /* Return the message associated with this suspension.  */
178 
179                 /* Setup source and destination pointers.  */
180                 source =       TX_VOID_TO_ULONG_POINTER_CONVERT(thread_ptr -> tx_thread_additional_suspend_info); // 发送消息的线程的消息直接保存在tx_thread_additional_suspend_info里面(tx_thread_additional_suspend_info指向线程待发送的消息,因为消息队列不够,该消息还在线程里面,还没发送到消息队列)
181                 destination =  TX_VOID_TO_ULONG_POINTER_CONVERT(destination_ptr);
182                 size =         queue_ptr -> tx_queue_message_size;
183 
184                 /* Copy message. Note that the source and destination pointers are 
185                    incremented by the macro.  */
186                 TX_QUEUE_MESSAGE_COPY(source, destination, size) // 拷贝消息数据
187 
188                 /* Message is now in the caller's destination. See if this is the only suspended thread 
189                    on the list.  */
190                 suspended_count--; // 等待发送消息的线程数减1
191                 if (suspended_count == TX_NO_SUSPENSIONS) // 没有更多线程等待发送消息,那么tx_queue_suspension_list设为空即可(tx_queue_suspension_list目前都是发送阻塞的线程)
192                 {
193 
194                     /* Yes, the only suspended thread.  */
195 
196                     /* Update the head pointer.  */
197                     queue_ptr -> tx_queue_suspension_list =  TX_NULL;
198                 }
199                 else // 有其他线程阻塞在发送消息,挂在tx_queue_suspension_list链表上面,那么将已经取走消息的线程thread_ptr从链表删除即可
200                 {
201 
202                     /* At least one more thread is on the same expiration list.  */
203 
204                     /* Update the list head pointer.  */
205                     next_thread =                            thread_ptr -> tx_thread_suspended_next;
206                     queue_ptr -> tx_queue_suspension_list =  next_thread;
207 
208                     /* Update the links of the adjacent threads.  */
209                     previous_thread =                              thread_ptr -> tx_thread_suspended_previous;
210                     next_thread -> tx_thread_suspended_previous =  previous_thread;
211                     previous_thread -> tx_thread_suspended_next =  next_thread;
212                 }
213 
214                 /* Decrement the suspension count.  */
215                 queue_ptr -> tx_queue_suspended_count =  suspended_count; // 更新等待消息队列的线程数
216 
217                 /* Prepare for resumption of the first thread.  */
218 
219                 /* Clear cleanup routine to avoid timeout.  */
220                 thread_ptr -> tx_thread_suspend_cleanup =  TX_NULL; // tx_thread_suspend_cleanup设置为空
221 
222                 /* Put return status into the thread control block.  */
223                 thread_ptr -> tx_thread_suspend_status =  TX_SUCCESS; // 消息被读取,状态设置为成功状态
224 
225 #ifdef TX_NOT_INTERRUPTABLE
226 
227                 /* Resume the thread!  */
228                 _tx_thread_system_ni_resume(thread_ptr);
229 
230                 /* Restore interrupts.  */
231                 TX_RESTORE
232 #else
233 
234                 /* Temporarily disable preemption.  */
235                 _tx_thread_preempt_disable++; // 禁止抢占
236 
237                 /* Restore interrupts.  */
238                 TX_RESTORE
239 
240                 /* Resume thread.  */
241                 _tx_thread_system_resume(thread_ptr); // 唤醒取走了消息的线程
242 #endif
243             }
244             else // 阻塞线程thread_ptr不是要将数据发送到消息最前面(追加消息到已有消息的末尾),那么还得从消息队列读消息
245             {
246 
247                 /* At this point, we know that the queue is full and there 
248                    are one or more threads suspended trying to send another
249                    message to this queue.  */
250 
251                 /* Setup source and destination pointers.  */
252                 source =       queue_ptr -> tx_queue_read;
253                 destination =  TX_VOID_TO_ULONG_POINTER_CONVERT(destination_ptr);
254                 size =         queue_ptr -> tx_queue_message_size;
255 
256                 /* Copy message. Note that the source and destination pointers are 
257                    incremented by the macro.  */
258                 TX_QUEUE_MESSAGE_COPY(source, destination, size)
259 
260                 /* Determine if we are at the end.  */
261                 if (source == queue_ptr -> tx_queue_end)
262                 {
263 
264                     /* Yes, wrap around to the beginning.  */
265                     source =  queue_ptr -> tx_queue_start;
266                 }
267 
268                 /* Setup the queue read pointer.   */
269                 queue_ptr -> tx_queue_read =  source; // 更新tx_queue_read,这之前的几行读消息代码与之前代码一样,略过...
270   
271                 /* Disable preemption.  */
272                 _tx_thread_preempt_disable++; // 禁止抢占(消息处理花了一些时间,后面需要临时开一下中断,让中断得到处理)
273 
274 #ifdef TX_NOT_INTERRUPTABLE
275 
276                 /* Restore interrupts.  */
277                 TX_RESTORE
278 
279                 /* Interrupts are enabled briefly here to keep the interrupt
280                    lockout time deterministic.  */
281 
282                 /* Disable interrupts again.  */
283                 TX_DISABLE
284 #endif
285 
286                 /* Decrement the preemption disable variable.  */
287                 _tx_thread_preempt_disable--; // 取消禁止抢占(中断已经关了,不需要禁止抢占)
288 
289                 /* Setup source and destination pointers.  */
290                 source =       TX_VOID_TO_ULONG_POINTER_CONVERT(thread_ptr -> tx_thread_additional_suspend_info); // 阻塞线程thread_ptr的消息地址
291                 destination =  queue_ptr -> tx_queue_write; // 消息队列写消息的地址(消息队列满的情况,这里就指向刚才已经被读取的消息地址)
292                 size =         queue_ptr -> tx_queue_message_size;
293 
294                 /* Copy message. Note that the source and destination pointers are 
295                    incremented by the macro.  */
296                 TX_QUEUE_MESSAGE_COPY(source, destination, size) // 拷贝消息到消息队列(末尾),拷贝过程destination同样在更新,更新指向下一个消息地址(下一个消息可以写入的地址)
297 
298                 /* Determine if we are at the end.  */
299                 if (destination == queue_ptr -> tx_queue_end) // 写地址已经到内存地址的末尾,再从消息队列内存的起始地址开始
300                 {
301             
302                     /* Yes, wrap around to the beginning.  */
303                     destination =  queue_ptr -> tx_queue_start;
304                 }
305 
306                 /* Adjust the write pointer.  */
307                 queue_ptr -> tx_queue_write =  destination; // 更新写消息的地址tx_queue_write
308 
309                 /* Pickup thread pointer.  */
310                 thread_ptr =  queue_ptr -> tx_queue_suspension_list; // 前面已经读取了thread_ptr,这里为什么还要再次读取?
311 
312                 /* Message is now in the queue.  See if this is the only suspended thread 
313                    on the list.  */
314                 suspended_count--; // 阻塞线程数减1
315                 if (suspended_count == TX_NO_SUSPENSIONS) // 没有线程等待写队列,tx_queue_suspension_list清空即可
316                 {
317 
318                   /* Yes, the only suspended thread.  */
319 
320                     /* Update the head pointer.  */
321                     queue_ptr -> tx_queue_suspension_list =  TX_NULL;
322                 }
323                 else // 有其他线程等待写队列,将thread_ptr从等待队列中删除即可
324                 {
325 
326                     /* At least one more thread is on the same expiration list.  */
327 
328                     /* Update the list head pointer.  */
329                     next_thread =                            thread_ptr -> tx_thread_suspended_next;
330                     queue_ptr -> tx_queue_suspension_list =  next_thread;
331 
332                     /* Update the links of the adjacent threads.  */
333                     previous_thread =                               thread_ptr -> tx_thread_suspended_previous;
334                     next_thread -> tx_thread_suspended_previous =   previous_thread;
335                     previous_thread -> tx_thread_suspended_next =   next_thread;
336                 }
337 
338                 /* Decrement the suspension count.  */
339                 queue_ptr -> tx_queue_suspended_count =  suspended_count; // 更新tx_queue_suspended_count
340 
341                 /* Prepare for resumption of the first thread.  */
342 
343                 /* Clear cleanup routine to avoid timeout.  */
344                 thread_ptr -> tx_thread_suspend_cleanup =  TX_NULL; // thread_ptr的消息已经放入消息队列了,清除tx_thread_suspend_cleanup
345 
346                 /* Put return status into the thread control block.  */
347                 thread_ptr -> tx_thread_suspend_status =  TX_SUCCESS; // 设置tx_thread_suspend_status状态为成功,表示线程的消息已经发送成功
348 
349 #ifdef TX_NOT_INTERRUPTABLE
350 
351                 /* Resume the thread!  */
352                 _tx_thread_system_ni_resume(thread_ptr);
353 
354                 /* Restore interrupts.  */
355                 TX_RESTORE
356 #else
357 
358                 /* Temporarily disable preemption.  */
359                 _tx_thread_preempt_disable++;
360 
361                 /* Restore interrupts.  */
362                 TX_RESTORE
363 
364                 /* Resume thread.  */
365                 _tx_thread_system_resume(thread_ptr); // 唤醒线程(因为本次只读取一个消息,然后thread_ptr的消息写完后,消息队列又满了,所以一次只能有一个阻塞线程写消息成功)
366 #endif
367             }
368         }
369     }
370 
371     /* Determine if the request specifies suspension.  */
372     else if (wait_option != TX_NO_WAIT) // 消息队列没有消息,并且有设置阻塞选项,需要阻塞等待有消息可读
373     {
374 
375         /* Determine if the preempt disable flag is non-zero.  */
376         if (_tx_thread_preempt_disable != ((UINT) 0)) // 有禁止抢占,不能阻塞当前线程
377         {
378 
379             /* Restore interrupts.  */
380             TX_RESTORE
381            
382             /* Suspension is not allowed if the preempt disable flag is non-zero at this point - return error completion.  */
383             status =  TX_QUEUE_EMPTY; // 返回消息队列为空即可
384         }
385         else // 没有禁止抢占,需要等待消息
386         {
387 
388             /* Prepare for suspension of this thread.  */
389 
390 #ifdef TX_QUEUE_ENABLE_PERFORMANCE_INFO
391 
392             /* Increment the total queue empty suspensions counter.  */
393             _tx_queue_performance_empty_suspension_count++;
394 
395             /* Increment the number of empty suspensions on this queue.  */
396             queue_ptr -> tx_queue_performance_empty_suspension_count++;
397 #endif
398             
399             /* Pickup thread pointer.  */
400             TX_THREAD_GET_CURRENT(thread_ptr) // 获取当前线程_tx_thread_current_ptr
401 
402             /* Setup cleanup routine pointer.  */
403             thread_ptr -> tx_thread_suspend_cleanup =  &(_tx_queue_cleanup); // 等待消息超时或者线程终止时需要调用_tx_queue_cleanup唤醒或者清理当前线程(当前线程挂在阻塞链表里面,需要从阻塞链表删除)
404 
405             /* Setup cleanup information, i.e. this queue control
406                block and the source pointer.  */
407             thread_ptr -> tx_thread_suspend_control_block =    (VOID *) queue_ptr; // 阻塞在消息队列queue_ptr上
408             thread_ptr -> tx_thread_additional_suspend_info =  (VOID *) destination_ptr; // 消息接收地址(别的线程有发送消息,会将数据直接拷贝到destination_ptr里面)
409             thread_ptr -> tx_thread_suspend_option =           TX_FALSE; // 读消息的时候,这个没起作用,都是从头读消息,不存在从末尾先读消息的情况
410 
411 #ifndef TX_NOT_INTERRUPTABLE
412 
413             /* Increment the suspension sequence number, which is used to identify
414                this suspension event.  */
415             thread_ptr -> tx_thread_suspension_sequence++;
416 #endif
417 
418             /* Setup suspension list.  */
419             if (suspended_count == TX_NO_SUSPENSIONS) // 没有其他线程等待读消息(当前线程组成一个元素的阻塞链表)
420             {
421 
422                 /* No other threads are suspended.  Setup the head pointer and
423                    just setup this threads pointers to itself.  */
424                 queue_ptr -> tx_queue_suspension_list =         thread_ptr;
425                 thread_ptr -> tx_thread_suspended_next =        thread_ptr;
426                 thread_ptr -> tx_thread_suspended_previous =    thread_ptr;
427             }
428             else // 有其他线程等待读消息,将当前线程添加到等待队列末尾即可
429             {
430 
431                 /* This list is not NULL, add current thread to the end. */
432                 next_thread =                                   queue_ptr -> tx_queue_suspension_list;
433                 thread_ptr -> tx_thread_suspended_next =        next_thread;
434                 previous_thread =                               next_thread -> tx_thread_suspended_previous;
435                 thread_ptr -> tx_thread_suspended_previous =    previous_thread;
436                 previous_thread -> tx_thread_suspended_next =   thread_ptr;
437                 next_thread -> tx_thread_suspended_previous =   thread_ptr;
438             }
439 
440             /* Increment the suspended thread count.  */
441             queue_ptr -> tx_queue_suspended_count =  suspended_count + ((UINT) 1); // 挂起线程的数目加1
442 
443             /* Set the state to suspended.  */
444             thread_ptr -> tx_thread_state =    TX_QUEUE_SUSP; // 线程状态设置为等待消息队列挂起状态
445 
446 #ifdef TX_NOT_INTERRUPTABLE
447 
448             /* Call actual non-interruptable thread suspension routine.  */
449             _tx_thread_system_ni_suspend(thread_ptr, wait_option);
450 
451             /* Restore interrupts.  */
452             TX_RESTORE
453 #else
454 
455             /* Set the suspending flag.  */
456             thread_ptr -> tx_thread_suspending =  TX_TRUE; // 设置挂起中操作,线程还没真正挂起,还在就绪线程链表
457 
458             /* Setup the timeout period.  */
459             thread_ptr -> tx_thread_timer.tx_timer_internal_remaining_ticks =  wait_option; // 等待选项(_tx_thread_system_suspend根据tx_timer_internal_remaining_ticks来启动超时定时器,如果是无限等待就不启动定时器)
460 
461             /* Temporarily disable preemption.  */
462             _tx_thread_preempt_disable++;
463 
464             /* Restore interrupts.  */
465             TX_RESTORE
466 
467             /* Call actual thread suspension routine.  */
468             _tx_thread_system_suspend(thread_ptr); // 挂起当前线程
469 #endif
470 
471             /* Return the completion status.  */
472             status =  thread_ptr -> tx_thread_suspend_status; // 发送消息的线程把消息拷贝给当前线程会设置成功状态,等待超时会设置超时状态(与内存、信号量、互斥锁等操作一样...)
473         }
474     }
475     else // 非阻塞,没有消息的时候返回消息队列为空即可
476     {
477 
478         /* Restore interrupts.  */
479         TX_RESTORE
480            
481         /* Immediate return, return error completion.  */
482         status =  TX_QUEUE_EMPTY;
483     }
484 
485     /* Return completion status.  */
486     return(status);
487 }

3、消息的发送_tx_queue_send

发送消息过程与接收消息类型,消息队列没有满没有线程等待消息,那么将消息拷贝到消息队列即可;

如果有线程等待消息,那么消息队列就为空,当前消息就是第一个消息,拷贝消息到第一个等待消息的线程并唤醒该线程即可;

如果消息队列满了,如果设置了等待选项并允许阻塞的话,那么需要挂载到等待链表,发送消息与接收消息的线程共用一个等待链表tx_queue_suspension_list,只可能有发送消息的线程在等待或者接收消息的线程等待或者没有线程等待,不存在发送消息的线程和接收消息的线程都等待的清空。

_tx_queue_send是将消息加到消息队列末尾,实现代码比较简单,具体分析看代码中的注释。

_tx_queue_send实现代码如下:

080 UINT  _tx_queue_send(TX_QUEUE *queue_ptr, VOID *source_ptr, ULONG wait_option)
081 {
082 
083 TX_INTERRUPT_SAVE_AREA
084    
085 TX_THREAD       *thread_ptr;
086 ULONG           *source;
087 ULONG           *destination;
088 UINT            size;
089 UINT            suspended_count;
090 TX_THREAD       *next_thread;
091 TX_THREAD       *previous_thread;
092 UINT            status;
093 #ifndef TX_DISABLE_NOTIFY_CALLBACKS
094 VOID            (*queue_send_notify)(struct TX_QUEUE_STRUCT *notify_queue_ptr);
095 #endif
096 
097 
098     /* Default the status to TX_SUCCESS.  */
099     status =  TX_SUCCESS;
100 
101     /* Disable interrupts to place message in the queue.  */
102     TX_DISABLE
103 
104 #ifdef TX_QUEUE_ENABLE_PERFORMANCE_INFO
105 
106     /* Increment the total messages sent counter.  */
107     _tx_queue_performance_messages_sent_count++;
108 
109     /* Increment the number of messages sent to this queue.  */
110     queue_ptr -> tx_queue_performance_messages_sent_count++;
111 #endif
112 
113     /* If trace is enabled, insert this event into the trace buffer.  */
114     TX_TRACE_IN_LINE_INSERT(TX_TRACE_QUEUE_SEND, queue_ptr, TX_POINTER_TO_ULONG_CONVERT(source_ptr), wait_option, queue_ptr -> tx_queue_enqueued, TX_TRACE_QUEUE_EVENTS)
115 
116     /* Log this kernel call.  */
117     TX_EL_QUEUE_SEND_INSERT
118 
119     /* Pickup the thread suspension count.  */
120     suspended_count =  queue_ptr -> tx_queue_suspended_count; // 等待队列线程数(发送线程或者接收线程)
121 
122     /* Determine if there is room in the queue.  */
123     if (queue_ptr -> tx_queue_available_storage != TX_NO_MESSAGES) // tx_queue_available_storage不为0,消息队列还可以接收tx_queue_available_storage个消息
124     {
125 
126         /* There is room for the message in the queue.  */
127 
128         /* Determine if there are suspended on this queue.  */
129         if (suspended_count == TX_NO_SUSPENSIONS) // 没有等待线程(消息队列可以接收数据,那么只等待队列只可能是接收消息的线程,没有等待消息的线程,那么就直接发送消息到消息队列即可)
130         {
131         
132             /* No suspended threads, simply place the message in the queue.  */
133             
134             /* Reduce the amount of available storage.  */
135             queue_ptr -> tx_queue_available_storage--; // 消息队列可接收消息的个数tx_queue_available_storage减1
136 
137             /* Increase the enqueued count.  */
138             queue_ptr -> tx_queue_enqueued++; // 消息队列里面消息的个数tx_queue_enqueued加1
139 
140             /* Setup source and destination pointers.  */
141             source =       TX_VOID_TO_ULONG_POINTER_CONVERT(source_ptr);
142             destination =  queue_ptr -> tx_queue_write;
143             size =         queue_ptr -> tx_queue_message_size;
144 
145             /* Copy message. Note that the source and destination pointers are 
146                incremented by the macro.  */
147             TX_QUEUE_MESSAGE_COPY(source, destination, size) // 拷贝消息到消息队列内存里面(destination更新到下一个消息)
148 
149             /* Determine if we are at the end.  */
150             if (destination == queue_ptr -> tx_queue_end) // 下一个消息的地址已经到了消息队列内存的末尾,下一个消息地址要从消息队列内存的起始地址开始
151             {
152 
153                 /* Yes, wrap around to the beginning.  */
154                 destination =  queue_ptr -> tx_queue_start;
155             }
156 
157             /* Adjust the write pointer.  */
158             queue_ptr -> tx_queue_write =  destination; // 更新写地址tx_queue_write,下一个消息从tx_queue_write开始写
159 
160 #ifndef TX_DISABLE_NOTIFY_CALLBACKS
161 
162             /* Pickup the notify callback routine for this queue.  */
163             queue_send_notify =  queue_ptr -> tx_queue_send_notify;
164 #endif
165 
166             /* No thread suspended, just return to caller.  */
167 
168             /* Restore interrupts.  */
169             TX_RESTORE
170 
171 #ifndef TX_DISABLE_NOTIFY_CALLBACKS
172 
173             /* Determine if a notify callback is required.  */
174             if (queue_send_notify != TX_NULL)
175             {
176 
177                 /* Call application queue send notification.  */
178                 (queue_send_notify)(queue_ptr);
179             }
180 #endif
181         }
182         else // 消息队列可接收消息个数不为0,有线程挂起,那么只可能是消息队列为空,有读消息的线程挂起,那么直接将当前发送的消息拷贝到读消息的线程即可,不需要先拷贝到消息队列,减少一次拷贝操作
183         {
184 
185             /* There is a thread suspended on an empty queue. Simply 
186                copy the message to the suspended thread's destination
187                pointer.  */
188 
189             /* Pickup the head of the suspension list.  */
190             thread_ptr =  queue_ptr -> tx_queue_suspension_list; // 第一个读消息线程
191 
192             /* See if this is the only suspended thread on the list.  */
193             suspended_count--; // 挂起线程个数减1
194             if (suspended_count == TX_NO_SUSPENSIONS) // 没有其他线程等待消息,那么等待队列tx_queue_suspension_list设置为空即可
195             {
196 
197                 /* Yes, the only suspended thread.  */
198 
199                 /* Update the head pointer.  */
200                 queue_ptr -> tx_queue_suspension_list =  TX_NULL;
201             }
202             else // 有其他线程等待消息,将thread_ptr从等待链表删除(发送的消息将直接拷贝给thread_ptr线程)
203             {
204 
205                 /* At least one more thread is on the same expiration list.  */
206 
207                 /* Update the list head pointer.  */
208                 queue_ptr -> tx_queue_suspension_list =  thread_ptr -> tx_thread_suspended_next;
209 
210                 /* Update the links of the adjacent threads.  */
211                 next_thread =                            thread_ptr -> tx_thread_suspended_next;
212                 queue_ptr -> tx_queue_suspension_list =  next_thread;
213 
214                 /* Update the links of the adjacent threads.  */
215                 previous_thread =                               thread_ptr -> tx_thread_suspended_previous;
216                 next_thread -> tx_thread_suspended_previous =   previous_thread;
217                 previous_thread -> tx_thread_suspended_next =   next_thread;
218             }
219 
220             /* Decrement the suspension count.  */
221             queue_ptr -> tx_queue_suspended_count =  suspended_count; // 更新等待线程个数(前面减1了,减掉了thread_ptr)
222 
223             /* Prepare for resumption of the thread.  */
224 
225             /* Clear cleanup routine to avoid timeout.  */
226             thread_ptr -> tx_thread_suspend_cleanup =  TX_NULL; // thread_ptr即将获取到消息,清空tx_thread_suspend_cleanup
227 
228             /* Setup source and destination pointers.  */
229             source =       TX_VOID_TO_ULONG_POINTER_CONVERT(source_ptr); // 发送消息的消息地址
230             destination =  TX_VOID_TO_ULONG_POINTER_CONVERT(thread_ptr -> tx_thread_additional_suspend_info); // thread_ptr接收消息的地址
231             size =         queue_ptr -> tx_queue_message_size;
232 
233             /* Copy message. Note that the source and destination pointers are 
234                incremented by the macro.  */
235             TX_QUEUE_MESSAGE_COPY(source, destination, size) // 直接将发送到消息拷贝到接收消息的线程
236 
237             /* Put return status into the thread control block.  */
238             thread_ptr -> tx_thread_suspend_status =  TX_SUCCESS; // 设置接收消息的线程的状态tx_thread_suspend_status为成功
239 
240 #ifndef TX_DISABLE_NOTIFY_CALLBACKS
241 
242             /* Pickup the notify callback routine for this queue.  */
243             queue_send_notify =  queue_ptr -> tx_queue_send_notify;
244 #endif
245 
246 #ifdef TX_NOT_INTERRUPTABLE
247 
248             /* Resume the thread!  */
249             _tx_thread_system_ni_resume(thread_ptr);
250 
251             /* Restore interrupts.  */
252             TX_RESTORE
253 #else
254 
255             /* Temporarily disable preemption.  */
256             _tx_thread_preempt_disable++;
257 
258             /* Restore interrupts.  */
259             TX_RESTORE
260 
261             /* Resume thread.  */
262             _tx_thread_system_resume(thread_ptr); // 唤醒获取到消息的线程thread_ptr
263 #endif
264 
265 #ifndef TX_DISABLE_NOTIFY_CALLBACKS
266 
267             /* Determine if a notify callback is required.  */
268             if (queue_send_notify != TX_NULL)
269             {
270 
271                 /* Call application queue send notification.  */
272                 (queue_send_notify)(queue_ptr);
273             }
274 #endif
275         }
276     }
277     
278     /* At this point, the queue is full. Determine if suspension is requested.  */
279     else if (wait_option != TX_NO_WAIT) // 消息队列满了,等待选项不是不等待,那么需要阻塞当前线程
280     {
281 
282         /* Determine if the preempt disable flag is non-zero.  */
283         if (_tx_thread_preempt_disable != ((UINT) 0)) // 禁止了抢占,那么不能阻塞调度,不能挂起当前线程,返回队列满了即可
284         {
285 
286             /* Restore interrupts.  */
287             TX_RESTORE
288 
289             /* Suspension is not allowed if the preempt disable flag is non-zero at this point - return error completion.  */
290             status =  TX_QUEUE_FULL; // 消息队列满了
291         }
292         else // 没有禁止抢占,可以阻塞当前线程
293         {
294 
295             /* Yes, prepare for suspension of this thread.  */
296 
297 #ifdef TX_QUEUE_ENABLE_PERFORMANCE_INFO
298 
299             /* Increment the total number of queue full suspensions.  */
300             _tx_queue_performance_full_suspension_count++;
301 
302             /* Increment the number of full suspensions on this queue.  */
303             queue_ptr -> tx_queue_performance_full_suspension_count++;
304 #endif
305             
306             /* Pickup thread pointer.  */
307             TX_THREAD_GET_CURRENT(thread_ptr)
308 
309             /* Setup cleanup routine pointer.  */
310             thread_ptr -> tx_thread_suspend_cleanup =  &(_tx_queue_cleanup);
311 
312             /* Setup cleanup information, i.e. this queue control
313                block and the source pointer.  */
314             thread_ptr -> tx_thread_suspend_control_block =    (VOID *) queue_ptr; // 消息队列
315             thread_ptr -> tx_thread_additional_suspend_info =  (VOID *) source_ptr; // 消息的地址
316             thread_ptr -> tx_thread_suspend_option =           TX_FALSE; // 正常发送消息,这个设置为TX_FALSE,是发送消息到消息队列末尾,不是插入到消息队列最前面
317 
318 #ifndef TX_NOT_INTERRUPTABLE
319 
320             /* Increment the suspension sequence number, which is used to identify
321                this suspension event.  */
322             thread_ptr -> tx_thread_suspension_sequence++;
323 #endif
324 
325             /* Setup suspension list.  */
326             if (suspended_count == TX_NO_SUSPENSIONS) // if...else...当前线程插入消息等待队列
327             {
328 
329                 /* No other threads are suspended.  Setup the head pointer and
330                    just setup this threads pointers to itself.  */
331                 queue_ptr -> tx_queue_suspension_list =         thread_ptr;
332                 thread_ptr -> tx_thread_suspended_next =        thread_ptr;
333                 thread_ptr -> tx_thread_suspended_previous =    thread_ptr;
334             }
335             else
336             {
337 
338                 /* This list is not NULL, add current thread to the end. */
339                 next_thread =                                   queue_ptr -> tx_queue_suspension_list;
340                 thread_ptr -> tx_thread_suspended_next =        next_thread;
341                 previous_thread =                               next_thread -> tx_thread_suspended_previous;
342                 thread_ptr -> tx_thread_suspended_previous =    previous_thread;
343                 previous_thread -> tx_thread_suspended_next =   thread_ptr;
344                 next_thread -> tx_thread_suspended_previous =   thread_ptr;
345             }
346 
347             /* Increment the suspended thread count.  */
348             queue_ptr -> tx_queue_suspended_count =  suspended_count + ((UINT) 1); // 等待线程加1(等待消息队列的线程个数)
349 
350             /* Set the state to suspended.  */
351             thread_ptr -> tx_thread_state =    TX_QUEUE_SUSP; // 线程状态设置为等待消息队列
352 
353 #ifndef TX_DISABLE_NOTIFY_CALLBACKS
354 
355             /* Pickup the notify callback routine for this queue.  */
356             queue_send_notify =  queue_ptr -> tx_queue_send_notify;
357 #endif
358 
359 #ifdef TX_NOT_INTERRUPTABLE
360 
361             /* Call actual non-interruptable thread suspension routine.  */
362             _tx_thread_system_ni_suspend(thread_ptr, wait_option);
363 
364             /* Restore interrupts.  */
365             TX_RESTORE
366 #else
367 
368             /* Set the suspending flag.  */
369             thread_ptr -> tx_thread_suspending =  TX_TRUE; // 线程挂起中
370 
371             /* Setup the timeout period.  */
372             thread_ptr -> tx_thread_timer.tx_timer_internal_remaining_ticks =  wait_option; // 等待选项(等待时间或者无限等待)
373 
374             /* Temporarily disable preemption.  */
375             _tx_thread_preempt_disable++;
376 
377             /* Restore interrupts.  */
378             TX_RESTORE
379 
380             /* Call actual thread suspension routine.  */
381             _tx_thread_system_suspend(thread_ptr); // 挂起当前线程
382 #endif
383 
384 #ifndef TX_DISABLE_NOTIFY_CALLBACKS
385 
386             /* Determine if a notify callback is required.  */
387             if (thread_ptr -> tx_thread_suspend_status == TX_SUCCESS)
388             {
389 
390                 /* Determine if there is a notify callback.  */
391                 if (queue_send_notify != TX_NULL)
392                 {
393 
394                     /* Call application queue send notification.  */
395                     (queue_send_notify)(queue_ptr);
396                 }
397             }
398 #endif
399 
400             /* Return the completion status.  */
401             status =  thread_ptr -> tx_thread_suspend_status;
402         }
403     }
404     else // 消息队列满了,不等待消息队列,返回消息队列满了即可
405     {
406 
407         /* Otherwise, just return a queue full error message to the caller.  */
408 
409 #ifdef TX_QUEUE_ENABLE_PERFORMANCE_INFO
410 
411         /* Increment the number of full non-suspensions on this queue.  */
412         queue_ptr -> tx_queue_performance_full_error_count++;
413 
414         /* Increment the total number of full non-suspensions.  */
415         _tx_queue_performance_full_error_count++;
416 #endif
417 
418         /* Restore interrupts.  */
419         TX_RESTORE
420 
421         /* Return error completion.  */
422         status =  TX_QUEUE_FULL; // 消息队列满了
423     }
424 
425     /* Return completion status.  */
426     return(status);
427 }

_tx_queue_front_send将消息发送到消息队列最前面,这个实现也比较简单,挂起线程时,是将线程插入队列表头,在此略过,通用技术可以参考比较早的文章,核心代码比较多重复的,不再重复介绍。

ItVuer - 免责声明 - 关于我们 - 联系我们

本网站信息来源于互联网,如有侵权请联系:561261067@qq.com

桂ICP备16001015号