namespace AsiaINFO.SMS.APPCMPP2.MyThread.CMPPThread { using AsiaINFO.SMS.APPCMPP2; using AsiaINFO.SMS.APPCMPP2.MyThread; using AsiaINFO.SMS.CMPP2; using AsiaINFO.SMS.Entity; using Common; using System; using System.Collections; using System.Collections.Generic; using System.Threading; using System.IO; using AsiaINFO.SMS.DBFactory; using BusinessFactory; public class SubmitThread : CMPPThreadBase { private int _activeTestInterval; private SyncEvents _connectEvents; private DateTime _lastBatchTime; private int _mtLimit; private int _reSubmitInterval; private Dictionary _slidingWindowDic; private SyncEvents _slidingWindowEvents; private int _slidingWindowSize; private Queue _submitQueue; private int _submitSeq; private object _syncSlidingWindowDicObject; public SubmitThread(CMPPClient cmppClient, SyncEvents syncEvents, Queue submitQueue, object syncSlidingWindowDicObject, Dictionary slidingWindowDic, SyncEvents slidingWindowEvents, SyncEvents connectEvents, int mtLimit, int slidingWindowSize, int reSubmitInterval, int activeTestInterval) : base(cmppClient, syncEvents) { this._mtLimit = 10; this._slidingWindowSize = 10; if (((submitQueue == null) || (syncSlidingWindowDicObject == null)) || (((slidingWindowDic == null) || (slidingWindowEvents == null)) || (connectEvents == null))) { throw new Exception("SubmitThread参数不能为空!"); } this._submitQueue = submitQueue; this._syncSlidingWindowDicObject = syncSlidingWindowDicObject; this._slidingWindowDic = slidingWindowDic; this._slidingWindowEvents = slidingWindowEvents; this._slidingWindowSize = slidingWindowSize; this._connectEvents = connectEvents; //this._mtLimit = mtLimit; this._mtLimit = 2000; this._lastBatchTime = DateTime.Now; this._reSubmitInterval = reSubmitInterval * 0x3e8; this._activeTestInterval = activeTestInterval * 0x3e8; } private void ActiveTest() { if (!base._cmppClient.IsConnected) { this._connectEvents.NewItemEvent.Set(); } else if (ShareData.ActiveSeq > 6) { base._cmppClient.IsConnected = false; this._connectEvents.NewItemEvent.Set(); ShareData.ActiveSeq = 0; } else { ShareData.ActiveSeq++; base.OnMsg("发送CMPP_ACTIVE_TEST", MsgLevel.Debug); base._cmppClient.ActiveTest(ShareData.SeqID); } } private void CheckLimit() { base.OnMsg(string.Format("----------------{0}----已发送{1}", this._lastBatchTime.Second, this._submitSeq), MsgLevel.Debug); int num = Utils.DateDiffMilliseconds(ref this._lastBatchTime); if (num < 0x3e8) { base.OnMsg("提交线程挂起...200毫秒", MsgLevel.Msg); Thread.Sleep((int) (0x3e8 - num)); } } private bool CMPPSubmit(SubmitInfo submit) { if (!base._cmppClient.IsConnected) { this._connectEvents.NewItemEvent.Set(); base._syncEvents.ExitThreadEvent.WaitOne(0x3e8, false); return false; } //检查重复 if (!CheckRepeat.Instance.checkMessageId(submit.MTWait.Guid)) { DBFactory.log4netService.Debug("【提交】添加 guid=" + submit.MTWait.Guid + ",手机号码:" + submit.MTWait.MOBILE_NO); CheckRepeat.Instance.addSubmitMessageToCache(submit.MTWait.Guid); } else { DBFactory.log4netService.Debug("【提交】【guid已经发送过】 guid=" + submit.MTWait.Guid + ",手机号码:" + submit.MTWait.MOBILE_NO); submit.SubmitTime = DateTime.Now; this._submitSeq++; if (this._submitSeq >= this._mtLimit) { this.CheckLimit(); this._submitSeq = 0; } return true; } //检测是否提交 try { lock (this._syncSlidingWindowDicObject) { if (this._slidingWindowDic.ContainsKey((ulong)submit.SeqID)) { OnMsg($"SeqID已存在,返回true", MsgLevel.Debug); return true; } else { this._slidingWindowDic.Add((ulong)submit.SeqID, submit); } } } catch (Exception ex) { log4netService.Error("cmppSubmit出错:"+ex.Message); return false; } string smscontent = submit.MTWait.SMS_CONTENT; if (!MTFactory.checkSplitSMSContent(smscontent)) { base._cmppClient.Submit(0L, submit.MTWait.MOBILE_NO, submit.MTWait.SMS_CONTENT, submit.SeqID, submit.MTWait.SERV_CODE, submit.MTWait.FEE_TYPE, submit.MTWait.FEE, submit.MTWait.LONG_SERV_NO, submit.MTWait.WapURL, submit.MTWait.MSG_SUM, submit.MTWait.MSG_SUB); } else { Dictionary dict = MTFactory.SplitContent(smscontent); foreach (KeyValuePair pair in dict) { base._cmppClient.Submit(0L, submit.MTWait.MOBILE_NO, pair.Value, submit.SeqID, submit.MTWait.SERV_CODE, submit.MTWait.FEE_TYPE, submit.MTWait.FEE, submit.MTWait.LONG_SERV_NO, submit.MTWait.WapURL, (uint)dict.Count, (uint)pair.Key); } } //base._cmppClient.Submit(0L, submit.MTWait.MOBILE_NO, submit.MTWait.SMS_CONTENT, submit.SeqID, submit.MTWait.SERV_CODE, submit.MTWait.FEE_TYPE, submit.MTWait.FEE, submit.MTWait.LONG_SERV_NO, submit.MTWait.WapURL, submit.MTWait.MSG_SUM, submit.MTWait.MSG_SUB); string msg = string.Format("发送短信 手机号码=[{0}] 发送号码=[{1}] 内容=[{2}]----时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"), submit.MTWait.MOBILE_NO, submit.MTWait.LONG_SERV_NO, submit.MTWait.SMS_CONTENT); base.OnMsg(msg, MsgLevel.Msg); submit.SubmitTime = DateTime.Now; this._submitSeq++; if (this._submitSeq >= this._mtLimit) { this.CheckLimit(); this._submitSeq = 0; } return true; } protected override void Run() { int count = 0; while (true) { while (base._syncEvents.NewItemEvent.WaitOne(this._activeTestInterval)) { OnMsg("[submit线程]_syncEvents开始执行...", MsgLevel.Debug); do { base.OnMsg("【收到待发送数据了....】",MsgLevel.Msg); this.SlidingWindowSubmit(); lock (((ICollection) this._submitQueue).SyncRoot) { count = this._submitQueue.Count; } } while (count > 0); } lock (((ICollection) this._submitQueue).SyncRoot) { count = this._submitQueue.Count; } if (count <= 0) { this.ActiveTest(); } } } private void SlidingWindowReSubmit() { try { List list = new List(); lock (this._syncSlidingWindowDicObject) { foreach (KeyValuePair pair in this._slidingWindowDic) { list.Add(pair.Value); } } foreach (SubmitInfo info in list) { base.OnMsg($"【ReSubmit】----submit:guid:{info.MTWait.Guid}手机号码:{info.MTWait.MOBILE_NO},listCount={list.Count}", MsgLevel.Msg); if (!this.CMPPSubmit(info)) { return; } base.OnMsg($"【ReSubmit】----ok", MsgLevel.Msg); } return; } catch (Exception exception) { base.OnMsg("[SlidingWindowReSubmit]" + exception.Message, MsgLevel.Err); } } private static DateTime WindowFullTime = DateTime.MinValue; private void SlidingWindowSubmit() { int num = this._submitDicCount; SubmitInfo submit = null; base.OnMsg($"[SlidingWindowSubmit调用] _slidingWindowSize={_slidingWindowSize},num={num}", MsgLevel.Msg); if (this._slidingWindowSize == num) { OnMsg($"[_slidingWindowSize=num]队列满的时候----_slidingWindowSize={_slidingWindowSize},num={num}," + $"WindowFullTime={WindowFullTime},DateTime.MinValue={DateTime.MinValue}",MsgLevel.Debug); if (SubmitThread.WindowFullTime == DateTime.MinValue) SubmitThread.WindowFullTime = DateTime.Now; else { //检查是否超过10秒钟 System.TimeSpan ts = DateTime.Now - SubmitThread.WindowFullTime; OnMsg($"[_slidingWindowSize=num]队列满的时候,检查是否超过10秒,ts={ts}",MsgLevel.Debug); if (ts.TotalSeconds > 10) { _slidingWindowDic.Clear();//清空 //10秒钟都还没有处理 //处理超时了 ,需要重新连接 base._cmppClient.IsConnected = false; OnMsg($"连接超时,需要重启连接...WindowFullTime={WindowFullTime}",MsgLevel.Msg); return; } } } else { SubmitThread.WindowFullTime = DateTime.MinValue; } for (int i = 1; i <= (this._slidingWindowSize - num); i++) { submit = _submitData; if (submit == null) { if (i <= 1) { return; } break; } base.OnMsg("submit:guid:"+submit.MTWait.Guid ,MsgLevel.Msg); if (!this.CMPPSubmit(submit)) { log4netService.Info("发送失败" + submit.MTWait.MOBILE_NO); this._submitData = submit; return; } base.OnMsg(string.Format("_slidingWindowSize:{0},num:{1},i:{2}",_slidingWindowSize,num,i), MsgLevel.Msg); } int num3 = 0; while (!this._slidingWindowEvents.NewItemEvent.WaitOne(this._reSubmitInterval, false)) { lock (this._syncSlidingWindowDicObject) { this._slidingWindowDic.Clear(); this._slidingWindowEvents.NewItemEvent.Set(); } if (this._submitDicCount != 0) { num3++; if (num3 <= 2) { OnMsg($"[重提],num3={num3},_submitDicCount={_submitDicCount}", MsgLevel.Debug); this.SlidingWindowReSubmit(); continue; } OnMsg($"[重提]处理完成,断开连接", MsgLevel.Debug); base._cmppClient.IsConnected = false; this._connectEvents.NewItemEvent.Set(); } return; } } private SubmitInfo _submitData { get { lock (((ICollection) this._submitQueue).SyncRoot) { if (this._submitQueue.Count > 0) { return this._submitQueue.Dequeue(); } } return null; } set { lock (((ICollection) this._submitQueue).SyncRoot) { this._submitQueue.Enqueue(value); } } } private int _submitDicCount { get { lock (this._syncSlidingWindowDicObject) { return this._slidingWindowDic.Count; } } } } }