namespace AsiaINFO.SMS.APPCMPP2.MyThread.CMPPThread
|
{
|
using AsiaINFO.SMS.APPCMPP2;
|
using AsiaINFO.SMS.APPCMPP2.MyThread;
|
using AsiaINFO.SMS.CMPP2;
|
using AsiaINFO.SMS.Entity;
|
using Common;
|
using System;
|
using System.Collections;
|
using System.Collections.Generic;
|
using System.Threading;
|
using System.IO;
|
using AsiaINFO.SMS.DBFactory;
|
using BusinessFactory;
|
|
public class SubmitThread : CMPPThreadBase
|
{
|
private int _activeTestInterval;
|
private SyncEvents _connectEvents;
|
private DateTime _lastBatchTime;
|
private int _mtLimit;
|
private int _reSubmitInterval;
|
private Dictionary<ulong, SubmitInfo> _slidingWindowDic;
|
private SyncEvents _slidingWindowEvents;
|
private int _slidingWindowSize;
|
private Queue<SubmitInfo> _submitQueue;
|
private int _submitSeq;
|
private object _syncSlidingWindowDicObject;
|
|
|
public SubmitThread(CMPPClient cmppClient, SyncEvents syncEvents, Queue<SubmitInfo> submitQueue, object syncSlidingWindowDicObject, Dictionary<ulong, SubmitInfo> slidingWindowDic, SyncEvents slidingWindowEvents, SyncEvents connectEvents, int mtLimit, int slidingWindowSize, int reSubmitInterval, int activeTestInterval) : base(cmppClient, syncEvents)
|
{
|
this._mtLimit = 10;
|
this._slidingWindowSize = 10;
|
if (((submitQueue == null) || (syncSlidingWindowDicObject == null)) || (((slidingWindowDic == null) || (slidingWindowEvents == null)) || (connectEvents == null)))
|
{
|
throw new Exception("SubmitThread参数不能为空!");
|
}
|
this._submitQueue = submitQueue;
|
this._syncSlidingWindowDicObject = syncSlidingWindowDicObject;
|
this._slidingWindowDic = slidingWindowDic;
|
this._slidingWindowEvents = slidingWindowEvents;
|
this._slidingWindowSize = slidingWindowSize;
|
this._connectEvents = connectEvents;
|
//this._mtLimit = mtLimit;
|
this._mtLimit = 2000;
|
this._lastBatchTime = DateTime.Now;
|
this._reSubmitInterval = reSubmitInterval * 0x3e8;
|
this._activeTestInterval = activeTestInterval * 0x3e8;
|
}
|
|
|
|
|
private void ActiveTest()
|
{
|
if (!base._cmppClient.IsConnected)
|
{
|
this._connectEvents.NewItemEvent.Set();
|
}
|
else if (ShareData.ActiveSeq > 6)
|
{
|
base._cmppClient.IsConnected = false;
|
this._connectEvents.NewItemEvent.Set();
|
ShareData.ActiveSeq = 0;
|
}
|
else
|
{
|
ShareData.ActiveSeq++;
|
base.OnMsg("发送CMPP_ACTIVE_TEST", MsgLevel.Debug);
|
base._cmppClient.ActiveTest(ShareData.SeqID);
|
}
|
}
|
|
private void CheckLimit()
|
{
|
base.OnMsg(string.Format("----------------{0}----已发送{1}", this._lastBatchTime.Second, this._submitSeq), MsgLevel.Debug);
|
int num = Utils.DateDiffMilliseconds(ref this._lastBatchTime);
|
if (num < 0x3e8)
|
{
|
base.OnMsg("提交线程挂起...200毫秒", MsgLevel.Msg);
|
Thread.Sleep((int) (0x3e8 - num));
|
}
|
}
|
|
|
|
|
private bool CMPPSubmit(SubmitInfo submit)
|
{
|
if (!base._cmppClient.IsConnected)
|
{
|
this._connectEvents.NewItemEvent.Set();
|
base._syncEvents.ExitThreadEvent.WaitOne(0x3e8, false);
|
return false;
|
}
|
|
|
//检查重复
|
if (!CheckRepeat.Instance.checkMessageId(submit.MTWait.Guid))
|
{
|
DBFactory.log4netService.Debug("【提交】添加 guid=" + submit.MTWait.Guid + ",手机号码:" + submit.MTWait.MOBILE_NO);
|
CheckRepeat.Instance.addSubmitMessageToCache(submit.MTWait.Guid);
|
}
|
else
|
{
|
DBFactory.log4netService.Debug("【提交】【guid已经发送过】 guid=" + submit.MTWait.Guid + ",手机号码:" + submit.MTWait.MOBILE_NO);
|
|
submit.SubmitTime = DateTime.Now;
|
this._submitSeq++;
|
if (this._submitSeq >= this._mtLimit)
|
{
|
this.CheckLimit();
|
this._submitSeq = 0;
|
}
|
return true;
|
}
|
|
|
//检测是否提交
|
try
|
{
|
lock (this._syncSlidingWindowDicObject)
|
{
|
if (this._slidingWindowDic.ContainsKey((ulong)submit.SeqID))
|
{
|
OnMsg($"SeqID已存在,返回true", MsgLevel.Debug);
|
return true;
|
}
|
else
|
{
|
this._slidingWindowDic.Add((ulong)submit.SeqID, submit);
|
}
|
}
|
}
|
catch (Exception ex)
|
{
|
log4netService.Error("cmppSubmit出错:"+ex.Message);
|
return false;
|
}
|
|
|
|
|
|
string smscontent = submit.MTWait.SMS_CONTENT;
|
|
if (!MTFactory.checkSplitSMSContent(smscontent))
|
{
|
|
|
base._cmppClient.Submit(0L, submit.MTWait.MOBILE_NO, submit.MTWait.SMS_CONTENT, submit.SeqID, submit.MTWait.SERV_CODE, submit.MTWait.FEE_TYPE, submit.MTWait.FEE, submit.MTWait.LONG_SERV_NO, submit.MTWait.WapURL, submit.MTWait.MSG_SUM, submit.MTWait.MSG_SUB);
|
}
|
else
|
{
|
|
|
Dictionary<int, string> dict = MTFactory.SplitContent(smscontent);
|
foreach (KeyValuePair<int, string> pair in dict)
|
{
|
base._cmppClient.Submit(0L, submit.MTWait.MOBILE_NO, pair.Value, submit.SeqID, submit.MTWait.SERV_CODE, submit.MTWait.FEE_TYPE, submit.MTWait.FEE, submit.MTWait.LONG_SERV_NO, submit.MTWait.WapURL, (uint)dict.Count, (uint)pair.Key);
|
}
|
}
|
|
|
//base._cmppClient.Submit(0L, submit.MTWait.MOBILE_NO, submit.MTWait.SMS_CONTENT, submit.SeqID, submit.MTWait.SERV_CODE, submit.MTWait.FEE_TYPE, submit.MTWait.FEE, submit.MTWait.LONG_SERV_NO, submit.MTWait.WapURL, submit.MTWait.MSG_SUM, submit.MTWait.MSG_SUB);
|
string msg = string.Format("发送短信 手机号码=[{0}] 发送号码=[{1}] 内容=[{2}]----时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"), submit.MTWait.MOBILE_NO, submit.MTWait.LONG_SERV_NO, submit.MTWait.SMS_CONTENT);
|
base.OnMsg(msg, MsgLevel.Msg);
|
submit.SubmitTime = DateTime.Now;
|
this._submitSeq++;
|
if (this._submitSeq >= this._mtLimit)
|
{
|
this.CheckLimit();
|
this._submitSeq = 0;
|
}
|
return true;
|
}
|
|
protected override void Run()
|
{
|
int count = 0;
|
while (true)
|
{
|
while (base._syncEvents.NewItemEvent.WaitOne(this._activeTestInterval))
|
{
|
OnMsg("[submit线程]_syncEvents开始执行...", MsgLevel.Debug);
|
do
|
{
|
base.OnMsg("【收到待发送数据了....】",MsgLevel.Msg);
|
this.SlidingWindowSubmit();
|
lock (((ICollection) this._submitQueue).SyncRoot)
|
{
|
count = this._submitQueue.Count;
|
}
|
}
|
while (count > 0);
|
}
|
lock (((ICollection) this._submitQueue).SyncRoot)
|
{
|
count = this._submitQueue.Count;
|
}
|
if (count <= 0)
|
{
|
this.ActiveTest();
|
}
|
}
|
}
|
|
private void SlidingWindowReSubmit()
|
{
|
try
|
{
|
|
List<SubmitInfo> list = new List<SubmitInfo>();
|
lock (this._syncSlidingWindowDicObject)
|
{
|
foreach (KeyValuePair<ulong, SubmitInfo> pair in this._slidingWindowDic)
|
{
|
list.Add(pair.Value);
|
}
|
}
|
foreach (SubmitInfo info in list)
|
{
|
base.OnMsg($"【ReSubmit】----submit:guid:{info.MTWait.Guid}手机号码:{info.MTWait.MOBILE_NO},listCount={list.Count}", MsgLevel.Msg);
|
if (!this.CMPPSubmit(info))
|
{
|
return;
|
}
|
base.OnMsg($"【ReSubmit】----ok", MsgLevel.Msg);
|
}
|
return;
|
}
|
catch (Exception exception)
|
{
|
base.OnMsg("[SlidingWindowReSubmit]" + exception.Message, MsgLevel.Err);
|
}
|
}
|
|
|
|
|
private static DateTime WindowFullTime = DateTime.MinValue;
|
|
private void SlidingWindowSubmit()
|
{
|
int num = this._submitDicCount;
|
SubmitInfo submit = null;
|
|
base.OnMsg($"[SlidingWindowSubmit调用] _slidingWindowSize={_slidingWindowSize},num={num}", MsgLevel.Msg);
|
|
if (this._slidingWindowSize == num)
|
{
|
OnMsg($"[_slidingWindowSize=num]队列满的时候----_slidingWindowSize={_slidingWindowSize},num={num}," +
|
$"WindowFullTime={WindowFullTime},DateTime.MinValue={DateTime.MinValue}",MsgLevel.Debug);
|
|
if (SubmitThread.WindowFullTime == DateTime.MinValue)
|
SubmitThread.WindowFullTime = DateTime.Now;
|
else
|
{
|
//检查是否超过10秒钟
|
System.TimeSpan ts = DateTime.Now - SubmitThread.WindowFullTime;
|
OnMsg($"[_slidingWindowSize=num]队列满的时候,检查是否超过10秒,ts={ts}",MsgLevel.Debug);
|
if (ts.TotalSeconds > 10)
|
{
|
_slidingWindowDic.Clear();//清空
|
|
//10秒钟都还没有处理
|
//处理超时了 ,需要重新连接
|
base._cmppClient.IsConnected = false;
|
|
OnMsg($"连接超时,需要重启连接...WindowFullTime={WindowFullTime}",MsgLevel.Msg);
|
return;
|
}
|
}
|
}
|
else
|
{
|
SubmitThread.WindowFullTime = DateTime.MinValue;
|
}
|
|
for (int i = 1; i <= (this._slidingWindowSize - num); i++)
|
{
|
submit = _submitData;
|
if (submit == null)
|
{
|
if (i <= 1)
|
{
|
return;
|
}
|
break;
|
}
|
base.OnMsg("submit:guid:"+submit.MTWait.Guid ,MsgLevel.Msg);
|
|
|
if (!this.CMPPSubmit(submit))
|
{
|
log4netService.Info("发送失败" + submit.MTWait.MOBILE_NO);
|
this._submitData = submit;
|
return;
|
}
|
|
base.OnMsg(string.Format("_slidingWindowSize:{0},num:{1},i:{2}",_slidingWindowSize,num,i), MsgLevel.Msg);
|
}
|
int num3 = 0;
|
while (!this._slidingWindowEvents.NewItemEvent.WaitOne(this._reSubmitInterval, false))
|
{
|
lock (this._syncSlidingWindowDicObject)
|
{
|
this._slidingWindowDic.Clear();
|
this._slidingWindowEvents.NewItemEvent.Set();
|
}
|
|
if (this._submitDicCount != 0)
|
{
|
num3++;
|
if (num3 <= 2)
|
{
|
OnMsg($"[重提],num3={num3},_submitDicCount={_submitDicCount}", MsgLevel.Debug);
|
this.SlidingWindowReSubmit();
|
continue;
|
}
|
OnMsg($"[重提]处理完成,断开连接", MsgLevel.Debug);
|
base._cmppClient.IsConnected = false;
|
this._connectEvents.NewItemEvent.Set();
|
}
|
return;
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
private SubmitInfo _submitData
|
{
|
get
|
{
|
lock (((ICollection) this._submitQueue).SyncRoot)
|
{
|
if (this._submitQueue.Count > 0)
|
{
|
return this._submitQueue.Dequeue();
|
}
|
}
|
return null;
|
}
|
set
|
{
|
lock (((ICollection) this._submitQueue).SyncRoot)
|
{
|
this._submitQueue.Enqueue(value);
|
}
|
}
|
}
|
|
private int _submitDicCount
|
{
|
get
|
{
|
lock (this._syncSlidingWindowDicObject)
|
{
|
return this._slidingWindowDic.Count;
|
}
|
}
|
}
|
}
|
}
|