The algorithm time and efficiency of other authors are not tested , This article dare not claim to be the fastest , But it's also pretty fast , In one I5 Single core resource processing on machine 3000 *
2000 Gray data time of 20ms, And the algorithm is independent of the size of the core , So called o(1) algorithm .

      Before implementing this algorithm , We also refer to the algorithm in a reference paper proposed by he Kaiming in the dark channel defogging : STREAMING MAXIMUM-MINIMUM FILTER
USING NO MORE THAN THREE COMPARISONS PER ELEMENT
<https://wenku.baidu.com/view/ebabbf6a25c52cc58bd6bedc.html>
 , The process described in this article is also o(1) Of , It's also pretty fast , But not fast enough .

     
I had an idea of my own , It's also based on the separation of rows and columns , Faster than the code above , And it's also o(1) algorithm , But the speed of the algorithm is related to the content of the picture , For example, after an algorithm is performed on a graph , Perform the same algorithm on the result again , Maybe it'll be a lot slower later , It has something to do with my improved algorithm , But it's rare .

      The algorithm introduced in this paper was also seen a long time ago , But it never attracted my attention , The corresponding reference paper is  A fast algorithm for local
minimum and maximum filters on rectangular and octagonal kernels
<http://www.docin.com/p-125421756.html>  , At that time, I thought that the realization of the paper might not be as fast as my own idea , So I didn't go into it .

      The implementation steps of this paper are also based on row column separation , That is to say, one-dimensional operation of row direction is carried out first , Then one-dimensional calculation of column direction is carried out for the results , Specific theoretical description, let's go to the research paper .

      In fact, the core of the paper is the following figure .



    In Input data representing one-dimensional row direction ,g/h Two intermediate data in separate operation , Same size as input data .

      As shown in the figure above , We assume that the kernel size to be calculated is R, Then divide a row into multiple sizes as D =(2R+1) Segments of , For example, in the figure R=2, D=5
, Preprocess each segment , among x Position No. stores the maximum value of the point on the straight line segment where the arrow is located ( minimum value ), We can deal with it in this way g and h
Two arrays , So for a point ( Index is I), Its radius R Maximum in ( Small ) The value is :Max/ Min(g(I+R),h(I-R)).

      It's a simple process .

      Let's take a set of data to illustrate the process , Suppose a row of data is as follows , We're going to expand ( Maximum ), The nuclear radius is 2:

          In:  20  12  35  9  10  7  32  15  20  45  28

   Corresponding g/h by :

          g:  20  20  35  35  35  7  32  32  32  45  45

       h:   35  35  35  10   9  45 45  45  45  45  28 

      If we want to calculate the 4 The radius of points is 2 Max of , Corresponding to   g(I+R) = g(4+2) = 7, h(I-R)=h(4-2)=35,
The result is max(7,35) = 35;

       If we want to calculate the 6 The radius of points is 2 Max of , Corresponding to   g(I+R) = g(6+2) = 32, h(I-R)=h(6-2)=10,
The result is max(32,10) = 32;

      Note that the index above is based on 1 Of the starting point of counting .

Edge processing :

     
Notice at the edge , Like the left edge , When the index of the point to be calculated is less than R Time , At this time h Value is invalid , On the right edge ,g Value is unreachable , But watch carefully , The problem is simple , Take the above data as an example , If you want to calculate the index as 2 The radius is 2 Maximum value of , because h(2-2) Is out of index ( As I said before, in this case 1 Is the starting point of subscript ), So we can't use the above method , Then we return to the essence of the problem , calculation 2 The radius is 2 Maximum value of , It's calculation max(In(1), In(2), In(3), In(4)),
And that's not the value g The index in the data is 2+2 Data at , Before that, I've helped us with the algorithm , Just take a direct value .

      At the end of the data ( right ), Things are different , We should H Middle value .

      From the above analysis, we can see that , This algorithm has a feature , The larger the radius is , The less time the algorithm takes , Because the edge only needs to copy the data , Without more calculation , Maybe a lot of people don't believe it .

Algorithm implementation :

    With the above description , To implement a fast corrosion or expansion algorithm, it is believed that it should be a very easy thing for parts , Advance direction processing , In column direction , It's easy .

  
I've been fascinated recently SSE Algorithm optimization , So I thought about this algorithm SSE optimization , I used to watch it SSE When , I've been thinking _mm_max_epi8/_mm_min_epi8 This one-time access 16 Can the most valued function of byte data be used for corrosion and expansion , But because they're comparing two consecutive numbers , It's hard to use in the direction of business , But if the data comparison is in the column direction , Then it can be used .

      We have a comparison of column directions for the above algorithms , No, there's a place .

      first , We give the update in column direction g value /h Values within the range of each segment C Language implementation code , Like getting g The approximate code for the value is as follows :
memcpy(G + StartY * ValidDataLength, Src + StartY * Stride, ValidDataLength);
for (int Y = StartY + 1; Y < EndY; Y++) { unsigned char *LinePS = Src + Y *
Stride; unsignedchar *LinePD = G + Y * ValidDataLength; unsigned char *LinePD =
G + (Y -1) * ValidDataLength; for (int X = 0; X < ValidDataLength; X++) {
LinePD[X]= IM_Max(LinePS[X], LinePL[X]); } }

  StartY Is the starting point of the calculated segment range ,EndY Is the end of the segment range , We observe g The law of data , Know that the maximum value of the first row in the range of segments is the data itself , And the latter is the result compared with the previous line .

   Obviously :
for (int X = 0; X < Width * Channel; X++) { LinePD[X] = IM_Max(LinePS[X],
LinePL[X]); }
   This code is easy to vectorize , If this is a floating-point operation , The compiler will directly help us with vector processing , But for bytes , It seems that the compiler is not so smart , We can quantify it manually , The code is as follows :
memcpy(G + StartY * ValidDataLength, Dest + StartY * ValidDataLength,
ValidDataLength);// Each segment G The first line of data is the original data for (int Y = StartY + 1; Y < EndY; Y++) {
unsignedchar *LinePS = Dest + Y * ValidDataLength; unsigned char *LinePD = G +
Y * ValidDataLength; unsigned char *LinePL = G + (Y - 1) * ValidDataLength; for
(int X = 0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i
*)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)),
_mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X
< ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } }
   among BlockSize = 16, Block = ValidDataLength / BlockSize;

      This code is very simple , about h The same is true of .

      When we get g,h After , Of the following processes C The code is very simple :
for (int Y = Radius; Y < IM_Min(BlockV * Size - Radius, Width); Y++) //
The data in the middle is consistent with G+Radius and R - Radius Take the big demand { unsigned char *LinePG = G +
IM_Min(Y + Radius, Width -1) * Width; unsigned char *LinePH = H + (Y - Radius) *
ValidDataLength; unsignedchar *LinePD = T + Y * ValidDataLength;for (int X =
0; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePG[X], LinePH[X]); } }
    It's the same thing , Internal for Circulation can be used directly SSE replace .

   
  But it's just the direction of the column , Is it possible to use the line direction SSE Do it , To be sure , Absolutely , But unless you're very patient , Think of all kinds of pack and unpack perhaps shuffle It's going to drive you crazy , And the final efficiency may not be as good as using ordinary ones directly C language .

      So how to deal with it , I think you can think of transposition , exactly , After the data is transposed, the column direction is processed , And then transpose it back is equivalent to processing the row direction of the original data .

      About transposition , It's always been a time-consuming process , But I transposed it SSE optimization ( support 8 position ,24 position ,32 position ), increase speed 4-6 times
<http://www.cnblogs.com/Imageshop/p/6796485.html>
  The use of SSE High speed transpose operation , Using it to realize the flow of this paper is very reliable .


   So we post most of the overall processing code :

   Vertical core :
int IM_Vert_MaxFilter(unsigned char *Src, unsigned char *Dest, int Width, int
Height,int Stride, int Radius) { int Channel = Stride / Width; if ((Src ==
NULL) || (Dest == NULL))return IM_STATUS_NULLREFRENCE; if ((Width <= 0) ||
(Height <=0)) return IM_STATUS_INVALIDPARAMETER; if ((Channel != 1) && (Channel
!=3) && (Channel != 4)) return IM_STATUS_INVALIDPARAMETER; //
From the perspective of memory saving , It can only take twice the extra memory unsigned char *G = (unsigned char *)malloc(Height *
Width * Channel *sizeof(unsigned char)); unsigned char *H = (unsigned char
*)malloc(Height * Width * Channel *sizeof(unsigned char)); if ((G == NULL) ||
(H == NULL)) { if (G != NULL) free(G); if (H != NULL) free(H); return
IM_STATUS_OUTOFMEMORY; }// Vertical processing int Size = Radius * 2 + 1, ValidDataLength =
Width * Channel; int BlockSize = 16, Block = ValidDataLength / BlockSize; int
BlockV = ((Height % Size) ==0 ? Height / Size : (Height / Size) + 1); for (int
Index =0; Index < BlockV; Index++) { int StartY = Index * Size, EndY =
IM_Min(Index * Size + Size, Height); memcpy(G + StartY * ValidDataLength, Src +
StartY * Stride, ValidDataLength);// Each segment G The first line of data is the original data for (int Y = StartY + 1; Y
< EndY; Y++) { unsigned char *LinePS = Src + Y * Stride; unsigned char *LinePD
= G + Y * ValidDataLength; unsigned char *LinePL = G + (Y - 1) *
ValidDataLength;for (int X = 0; X < Block * BlockSize; X += BlockSize) {
_mm_storeu_si128((__m128i*)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i
*)(LinePS + X)), _mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X =
Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X],
LinePL[X]); } } memcpy(H+ StartY * ValidDataLength, G + (EndY - 1) *
ValidDataLength, ValidDataLength);// Each segment H The first row of data is corresponding G Last row of data memcpy(H + (EndY - 1
) * ValidDataLength, Src + (EndY -1) * Stride, ValidDataLength); //
Each segment H The last row of data is the original data for (int Y = EndY - 2; Y > StartY; Y--) // Pay attention to the change of cycle times {
unsignedchar *LinePS = Src + Y * Stride; unsigned char *LinePD = H + Y *
ValidDataLength; unsignedchar *LinePL = H + (Y + 1) * ValidDataLength; for (int
X =0; X < Block * BlockSize; X += BlockSize) { _mm_storeu_si128((__m128i
*)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS + X)),
_mm_loadu_si128((__m128i *)(LinePL + X)))); } for (int X = Block * BlockSize; X
< ValidDataLength; X++) { LinePD[X] = IM_Max(LinePS[X], LinePL[X]); } } //
For the maximum algorithm , The last block in the column direction is not Size Big hour , The following data can only be duplicate edge pixels , It's like this G/H Values and Height - 1 The size is the same } //
The whole data is divided into three parts ,[0, Radius] Is the first group ,[Radius, BlockV * Size - Radius] Is the second group ,[BlockV * Size
- Radius, BlockV * Size] Group 3 ,// Results of the first set of data G in [Radius, 2 * Radius] Value of , Access to the second group of data G +
Radius and H - Radius Small value in , The third group H - Radius Value of .// Top half data , At this time H Invalid data // // Several codes are deleted here //
for (int Y = Radius; Y < IM_Min(BlockV * Size - Radius, Height); Y++) //
The data in the middle is consistent with G + Radius and R - Radius Take the big demand { unsigned char *LinePG = G +
IM_Min(Y + Radius, Height -1) * ValidDataLength; // May be out of range unsigned char
*LinePH = H + (Y - Radius) * ValidDataLength; unsigned char *LinePD = Dest + Y *
Stride;for (int X = 0; X < Block * BlockSize; X += BlockSize) {
_mm_storeu_si128((__m128i*)(LinePD + X), _mm_max_epu8(_mm_loadu_si128((__m128i
*)(LinePG + X)), _mm_loadu_si128((__m128i *)(LinePH + X)))); } for (int X =
Block * BlockSize; X < ValidDataLength; X++) { LinePD[X] = IM_Max(LinePG[X],
LinePH[X]); } }// Bottom half data , At this time G Data is useless // // Several codes are deleted here // free(G); free(H); return
IM_STATUS_OK; }
   Integrated call :
int IM_MaxFilter(unsigned char *Src, unsigned char *Dest, int Width, int
Height,int Stride, int Radius) { int Channel = Stride / Width; if ((Src ==
NULL) || (Dest == NULL))return IM_STATUS_NULLREFRENCE; if ((Width <= 0) ||
(Height <=0)) return IM_STATUS_INVALIDPARAMETER; if ((Channel != 1) && (Channel
!=3) && (Channel != 4)) return IM_STATUS_INVALIDPARAMETER; int Status =
IM_STATUS_OK; unsignedchar *T = (unsigned char *)malloc(Height * Stride * sizeof
(unsignedchar)); if (T == NULL) return IM_STATUS_OUTOFMEMORY; Status =
IM_Vert_MaxFilter(Src, T, Width, Height, Stride, Radius);if (Status !=
IM_STATUS_OK)goto FreeMemory; Status = IM_Transpose(T, Dest, Width, Height,
Stride, Height, Width, Height * Channel);// Transposition , be careful Dest I only used Height * Channel Data for if
(Status != IM_STATUS_OK)goto FreeMemory; Status = IM_Vert_MaxFilter(Dest, T,
Height, Width, Height * Channel, Radius); if (Status != IM_STATUS_OK) goto
FreeMemory; Status= IM_Transpose(T, Dest, Height, Width, Height * Channel,
Width, Height, Stride); FreeMemory: free(T);return Status; }
   There are many details in the above code , Including the processing of incomplete data at the end of the block , You can understand .

      Two parts of the code have been deleted , Deleted code is easy to fill up , Because I don't like my code being copied and pasted directly by others .

Further analysis :

  
You can see from the above code , To realize the whole process , We need the original 3 Extra memory times the size , Is it possible to reduce this , Yes , When processing column direction , We can only deal with it at one time 16 Column or 32 column , such g/h Data only needed Height
* 16(32) * sizeof(unsigned
char) Size of memory , And the other advantage of this is that when comparing each segment internally , Due to less updated content , You can use one xmm Register holds the most valuable temporary result , In this way, it is not necessary to load the memory data of the previous row , Many times less memory reading and writing process , A simple example code is as follows :
unsigned char *LinePS = Src + StartY * Stride + X; unsigned char *LinePD = G
+ StartY *32; __m128i Max1 = _mm_setzero_si128(), Max2 = _mm_setzero_si128(); //
This can reduce one memory load for (int Y = StartY; Y < EndY; Y++) { Max1 =
_mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS +0)), Max1); // Or use one AVX instructions
Max2 = _mm_max_epu8(_mm_loadu_si128((__m128i *)(LinePS +16)), Max2);
_mm_store_si128((__m128i*)(LinePD + 0), Max1); _mm_store_si128((__m128i
*)(LinePD +16), Max2); LinePS += Stride; LinePD += 32; }

   Test in my notebook , This is a little faster than the previous version , And it takes up less memory , Kill two birds with one stone .

      You are welcome to provide a faster way to implement the algorithm .

      this paper Demo Download address :  
http://files.cnblogs.com/files/Imageshop/SSE_Optimization_Demo.rar
<http://files.cnblogs.com/files/Imageshop/SSE_Optimization_Demo.rar>
, All the algorithms in it are based on SSE Realized .

     

   If you think this article will help you , Please like this article .