使用场景描述:


  网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

其他需求描述:

  这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

技术实现:

  对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

代码实现:

  公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库
/* * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。 *
它是被设计用来修饰被不同线程访问和修改的变量。 * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
*/ private static readonly object lockHelper = new object(); private volatile
static UniqueCheck _instance; /// <summary> /// 获取单一实例 /// </summary> ///
<returns></returns> public static UniqueCheck GetInstance() { if (_instance ==
null) { lock (lockHelper) { if (_instance == null) _instance = new
UniqueCheck(); } } return _instance; }
  这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

  自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4
using System.Threading; 5 6 namespace PackgeUniqueCheck 7 { 8 /// <summary>
9 /// 非加锁并发队列,处理100个并发数以内 10 /// </summary> 11 /// <typeparam
name="T"></typeparam> 12 public class ConcurrentLinkedQueue<T> 13 { 14
private class Node<K> 15 { 16 internal K Item; 17 internal Node<K> Next; 18
19 public Node(K item, Node<K> next) 20 { 21 this.Item = item; 22 this
.Next = next; 23 } 24 } 25 26 private Node<T> _head; 27 private Node<T>
_tail; 28 29 public ConcurrentLinkedQueue() 30 { 31 _head = new Node<T>(
default(T), null); 32 _tail = _head; 33 } 34 35 public bool IsEmpty 36 {
37 get { return (_head.Next == null); } 38 } 39 /// <summary> 40 /// 进入队列
41 /// </summary> 42 /// <param name="item"></param> 43 public void Enqueue(T
item) 44 { 45 Node<T> newNode = new Node<T>(item, null); 46 while (true) 47
{ 48 Node<T> curTail = _tail; 49 Node<T> residue = curTail.Next; 50 51 //
判断_tail是否被其他process改变 52 if (curTail == _tail) 53 { 54 //A
有其他process执行C成功,_tail应该指向新的节点 55 if (residue == null) 56 { 57 //C
其他process改变了tail节点,需要重新取tail节点 58 if (Interlocked.CompareExchange<Node<T>>( 59
ref curTail.Next, newNode, residue) == residue) 60 { 61 //D 尝试修改tail 62
Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail); 63 return;
64 } 65 } 66 else 67 { 68 //B 帮助其他线程完成D操作 69
Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail); 70 } 71 }
72 } 73 } 74 /// <summary> 75 /// 队列取数据 76 /// </summary> 77 /// <param
name="result"></param> 78 /// <returns></returns> 79 public bool TryDequeue(
out T result) 80 { 81 Node<T> curHead; 82 Node<T> curTail; 83 Node<T> next;
84 while (true) 85 { 86 curHead = _head; 87 curTail = _tail; 88 next =
curHead.Next; 89 if (curHead == _head) 90 { 91 if (next == null) //Queue为空
92 { 93 result = default(T); 94 return false; 95 } 96 if (curHead ==
curTail)//Queue处于Enqueue第一个node的过程中 97 { 98 //尝试帮助其他Process完成操作 99
Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail); 100 } 101 else
102 { 103 //取next.Item必须放到CAS之前 104 result = next.Item; 105 //
如果_head没有发生改变,则将_head指向next并退出 106 if (Interlocked.CompareExchange<Node<T>>(ref
_head,107 next, curHead) == curHead) 108 break; 109 } 110 } 111 } 112 return
true; 113 } 114 /// <summary> 115 /// 尝试获取最后一个对象 116 /// </summary> 117 ///
<param name="result"></param> 118 /// <returns></returns> 119 public bool
TryGetTail(out T result) 120 { 121 result = default(T); 122 if (_tail == null)
123 { 124 return false; 125 } 126 result = _tail.Item; 127 return true; 128 }
129 } 130 }
虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。
1 using System; 2 using System.Collections.Generic; 3 using System.Text; 4
using System.Threading; 5 using System.Collections; 6 7 namespace
PackgeUniqueCheck 8 { 9 public class UniqueCheck 10 { 11 /* 12 *
volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。 13 *
它是被设计用来修饰被不同线程访问和修改的变量。 14 *
如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。 15 */ 16 private static
readonly object lockHelper = new object(); 17 18 private volatile static
UniqueCheck _instance; 19 20 /// <summary> 21 /// 获取单一实例 22 /// </summary>
23 /// <returns></returns> 24 public static UniqueCheck GetInstance() 25 {
26 if (_instance == null) 27 { 28 lock (lockHelper) 29 { 30 if (_instance
==null) 31 _instance = new UniqueCheck(); 32 } 33 } 34 return _instance;
35 } 36 37 private UniqueCheck() 38 { 39 //创建一个线程安全的哈希表,作为字典缓存 40
_DataKey = Hashtable.Synchronized(new Hashtable()); 41 Queue myqueue = new
Queue(); 42 _DataQueue = Queue.Synchronized(myqueue); 43 _Myqueue = new
ConcurrentLinkedQueue<string>(); 44 _Timer = new Thread(DoTicket); 45
_Timer.Start(); 46 } 47 48 #region 公共属性设置 49 /// <summary> 50 ///
设定定时线程的休眠时间长度:默认为1分钟 51 /// 时间范围:1-7200000,值为1毫秒到2小时 52 /// </summary> 53 ///
<param name="value"></param> 54 public void SetTimeSpan(int value) 55 { 56
if (value > 0&& value <=7200000) 57 { 58 _TimeSpan = value; 59 } 60 } 61
/// <summary> 62 /// 设定缓存Cache中的最大记录条数 63 /// 值范围:1-5000000,1到500万 64 ///
</summary> 65 /// <param name="value"></param> 66 public void SetCacheMaxNum(
int value) 67 { 68 if (value > 0 && value <= 5000000) 69 { 70
_CacheMaxNum = value; 71 } 72 } 73 /// <summary> 74 /// 设置是否在控制台中显示日志 75
/// </summary> 76 /// <param name="value"></param> 77 public void
SetIsShowMsg(bool value) 78 { 79 Helper.IsShowMsg = value; 80 } 81 ///
<summary> 82 /// 线程请求阻塞增量 83 /// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20% 84 ///
</summary> 85 /// <param name="value"></param> 86 public void SetBlockNumExt(
int value) 87 { 88 if (value > 0 && value <= _CacheMaxNum) 89 { 90
_BlockNumExt = value; 91 } 92 } 93 /// <summary> 94 /// 请求阻塞时间 95 ///
值范围:1-max,根据阻塞增量设置请求阻塞时间 96 /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差 97 /// </summary>
98 /// <param name="value"></param> 99 public void SetBlockSpanTime(int value)
100 { 101 if (value > 0) 102 { 103 _BlockSpanTime = value; 104 } 105 } 106
#endregion 107 108 #region 私有变量 109 /// <summary> 110 /// 内部运行线程 111 ///
</summary> 112 private Thread _runner = null; 113 /// <summary> 114 ///
可处理高并发的队列115 /// </summary> 116 private ConcurrentLinkedQueue<string> _Myqueue =
null; 117 /// <summary> 118 /// 唯一内容的时间健值对 119 /// </summary> 120 private
Hashtable _DataKey =null; 121 /// <summary> 122 /// 内容时间队列 123 /// </summary>
124 private Queue _DataQueue = null; 125 /// <summary> 126 ///
定时线程的休眠时间长度:默认为1分钟127 /// </summary> 128 private int _TimeSpan = 3000; 129 ///
<summary> 130 /// 定时计时器线程 131 /// </summary> 132 private Thread _Timer = null;
133 /// <summary> 134 /// 缓存Cache中的最大记录条数 135 /// </summary> 136 private int
_CacheMaxNum =500000; 137 /// <summary> 138 /// 线程请求阻塞增量 139 /// </summary> 140
private int _BlockNumExt = 10000; 141 /// <summary> 142 /// 请求阻塞时间 143 ///
</summary> 144 private int _BlockSpanTime = 100; 145 #endregion 146 147 #region
私有方法148 private void StartRun() 149 { 150 _runner = new Thread(DoAction); 151
_runner.Start();152 Helper.ShowMsg("内部线程启动成功!"); 153 } 154 155 private string
GetItem()156 { 157 string tp = string.Empty; 158 bool result =
_Myqueue.TryDequeue(out tp); 159 return tp; 160 } 161 /// <summary> 162 ///
执行循环操作163 /// </summary> 164 private void DoAction() 165 { 166 while (true) 167
{168 while (!_Myqueue.IsEmpty) 169 { 170 string item = GetItem(); 171
_DataQueue.Enqueue(item);172 if (!_DataKey.ContainsKey(item)) 173 { 174
_DataKey.Add(item, DateTime.Now);175 } 176 } 177 //
Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态..."); 178 Thread.Sleep(2); 179 } 180 } 181
/// <summary> 182 /// 执行定时器的动作 183 /// </summary> 184 private void DoTicket()
185 { 186 while (true) 187 { 188 Helper.ShowMsg("当前数据队列个数:" +
_DataQueue.Count.ToString());189 if (_DataQueue.Count > _CacheMaxNum) 190 { 191
while (true) 192 { 193 Helper.ShowMsg(string.Format("
当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _DataQueue.Count,
_CacheMaxNum.ToString()));194 string item = _DataQueue.Dequeue().ToString(); 195
if (!string.IsNullOrEmpty(item)) 196 { 197 if (_DataKey.ContainsKey(item)) 198
{199 _DataKey.Remove(item); 200 } 201 if (_DataQueue.Count <= _CacheMaxNum)
202 { 203 Helper.ShowMsg("清理完成,开始休眠清理线程..."); 204 break; 205 } 206 } 207 }
208 } 209 Thread.Sleep(_TimeSpan); 210 } 211 } 212 213 /// <summary> 214 ///
线程进行睡眠等待215 /// 如果当前负载压力大大超出了线程的处理能力 216 /// 那么需要进行延时调用 217 /// </summary> 218
private void BlockThread() 219 { 220 if (_DataQueue.Count > _CacheMaxNum +
_BlockNumExt)221 { 222 Thread.Sleep(_BlockSpanTime); 223 } 224 } 225
#endregion 226 227 #region 公共方法 228 /// <summary> 229 /// 开启服务线程 230 ///
</summary> 231 public void Start() 232 { 233 if (_runner == null) 234 { 235
StartRun();236 } 237 else 238 { 239 if (_runner.IsAlive == false) 240 { 241
StartRun();242 } 243 } 244 245 } 246 /// <summary> 247 /// 关闭服务线程 248 ///
</summary> 249 public void Stop() 250 { 251 if (_runner != null) 252 { 253
_runner.Abort();254 _runner = null; 255 } 256 } 257 258 /// <summary> 259 ///
添加内容信息260 /// </summary> 261 /// <param name="item">内容信息</param> 262 ///
<returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns> 263 public bool
AddItem(string item) 264 { 265 BlockThread(); 266 item = Helper.MakeMd5(item);
267 if (_DataKey.ContainsKey(item)) 268 { 269 return false; 270 } 271 else 272
{273 _Myqueue.Enqueue(item); 274 return true; 275 } 276 } 277 /// <summary>
278 /// 判断内容信息是否已经存在 279 /// </summary> 280 /// <param name="item">内容信息</param>
281 /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns> 282 public bool
CheckItem(string item) 283 { 284 item = Helper.MakeMd5(item); 285 return
_DataKey.ContainsKey(item);286 } 287 #endregion 288 289 } 290 }
模拟测试代码:
private static string _example = Guid.NewGuid().ToString(); private static
UniqueCheck _uck =null; static void Main(string[] args) { _uck =
UniqueCheck.GetInstance(); _uck.Start(); _uck.SetIsShowMsg(false);
_uck.SetCacheMaxNum(20000000); _uck.SetBlockNumExt(1000000); _uck.SetTimeSpan(
6000); _uck.AddItem(_example); Thread[] threads = new Thread[20]; for (int i = 0
; i <20; i++) { threads[i] = new Thread(AddInfo); threads[i].Start(); } Thread
checkthread= new Thread(CheckInfo); checkthread.Start(); string value =
Console.ReadLine(); checkthread.Abort();for (int i = 0; i < 50; i++) {
threads[i].Abort(); } _uck.Stop(); }static void AddInfo() { while (true) {
_uck.AddItem(Guid.NewGuid().ToString()); } }static void CheckInfo() { while (
true) { Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd
HH:mm:ss.ffff")); Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));
Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"
));
          //调整进程休眠时间,可以测试高并发的情况 //Thread.Sleep(1000); } }
测试截图:



 

友情链接
ioDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信