You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
252 lines
10 KiB
252 lines
10 KiB
using System;
|
|
using System.Text.Json;
|
|
using System.Threading.Tasks;
|
|
using Azure.Storage.Blobs;
|
|
using Azure.Storage.Blobs.Models;
|
|
using Azure.Messaging.ServiceBus;
|
|
using Microsoft.Azure.Functions.Worker;
|
|
using Microsoft.Extensions.Logging;
|
|
using Google.Protobuf.Reflection;
|
|
|
|
namespace CDP
|
|
{
|
|
public class EventProcessor
|
|
{
|
|
private static string FileAuditContainer = "FileAudits";
|
|
private static string UserAuditContainer = "UserAudits";
|
|
private static string GroupAuditContainer = "GroupAudits";
|
|
private static string TenantAuditContainer = "TenantAudits";
|
|
private readonly ILogger<EventProcessor> _logger;
|
|
|
|
public EventProcessor(ILogger<EventProcessor> logger)
|
|
{
|
|
_logger = logger;
|
|
}
|
|
|
|
[Function("ProcessEvent")]
|
|
public async Task ProcessCDPJob(
|
|
[ServiceBusTrigger("mail-events-queue", Connection = "ServiceBusConnectionString")]
|
|
ServiceBusReceivedMessage message,
|
|
ServiceBusMessageActions messageActions)
|
|
{
|
|
_logger.LogInformation("Message ID: {id}", message.MessageId);
|
|
_logger.LogInformation("Message Body: {body}", message.Body);
|
|
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
|
|
Job cdpJob;
|
|
try
|
|
{
|
|
cdpJob = JsonSerializer.Deserialize<Job>(message.Body);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError("received incorrect structure in body: {0} got exception: {1}", message.Body, ex);
|
|
await messageActions.CompleteMessageAsync(message);
|
|
return;
|
|
}
|
|
if (cdpJob != null)
|
|
{
|
|
switch (cdpJob.EventType)
|
|
{
|
|
case JobType.MailMetaProcessing:
|
|
await ProcessMailMetaJob(cdpJob);
|
|
break;
|
|
case JobType.KeyVaultInsertion:
|
|
_logger.LogInformation("key vault insertion processing");
|
|
await ProcessKeyVaultInsertionJob(cdpJob);
|
|
break;
|
|
case JobType.AddAudits:
|
|
_logger.LogInformation("Adding cdp audits");
|
|
await ProcessAuditEvent(cdpJob);
|
|
break;
|
|
case JobType.KeyVaultInsertionBatch:
|
|
_logger.LogInformation("batch key vault insertion processing");
|
|
await ProcessKeyVaultBatchJob(cdpJob);
|
|
break;
|
|
case JobType.AddAuditsBatch:
|
|
_logger.LogInformation("batch audit insertion in progress");
|
|
await ProcessAuditsBatchJob(cdpJob);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
// Complete the message
|
|
await messageActions.CompleteMessageAsync(message);
|
|
}
|
|
|
|
internal async Task ProcessAuditEvent(Job job)
|
|
{
|
|
AuditEventMetadata auditEventMetadata = JsonSerializer.Deserialize<AuditEventMetadata>(job.JobMetadata);
|
|
await AuditFunctions.AddAudits(job.AppKey, auditEventMetadata.FileId, auditEventMetadata.FileName, auditEventMetadata.UserId, "", auditEventMetadata.Action, auditEventMetadata.Message);
|
|
return;
|
|
}
|
|
|
|
internal async Task ProcessKeyVaultBatchJob(Job job)
|
|
{
|
|
KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI);
|
|
List<KeyVaultEvent> events = new List<KeyVaultEvent>();
|
|
try
|
|
{
|
|
events = JsonSerializer.Deserialize<List<KeyVaultEvent>>(job.JobMetadata);
|
|
events.ForEach(async (KeyVaultEvent evt) =>
|
|
{
|
|
await kvs.SetSecretAsync(evt.FileId, evt.AESKey);
|
|
});
|
|
} catch (Exception ex)
|
|
{
|
|
_logger.LogError("Error parsing job: {1} keyvault event metadata error: {0}", ex.ToString(), JsonSerializer.Serialize(job));
|
|
return;
|
|
}
|
|
}
|
|
|
|
internal async Task ProcessAuditsBatchJob(Job job)
|
|
{
|
|
try
|
|
{
|
|
List<AuditRecord> tenantAuditRecords = new List<AuditRecord>();
|
|
List<AuditRecord> userAuditRecords = new List<AuditRecord>();
|
|
List<AuditEventMetadata> auditEventMetadata = JsonSerializer.Deserialize<List<AuditEventMetadata>>(job.JobMetadata);
|
|
for(int i = 0; i<auditEventMetadata.Count; i++)
|
|
{
|
|
FileAuditRecord fileAuditRecord = new FileAuditRecord
|
|
{
|
|
Action = auditEventMetadata[i].Action,
|
|
FileId = auditEventMetadata[i].FileId,
|
|
FileName = auditEventMetadata[i].FileName,
|
|
Message = auditEventMetadata[i].Message,
|
|
UserId = auditEventMetadata[i].UserId,
|
|
GroupId = ""
|
|
};
|
|
await AuditDB.AppendRecord(fileAuditRecord.id, fileAuditRecord, FileAuditContainer);
|
|
TenantAuditRecord auditRecord = new TenantAuditRecord
|
|
{
|
|
AppKey = job.AppKey,
|
|
Action = auditEventMetadata[i].Action,
|
|
FileId = auditEventMetadata[i].FileId,
|
|
FileName = auditEventMetadata[i].FileName,
|
|
Message = auditEventMetadata[i].Message,
|
|
UserId = auditEventMetadata[i].UserId,
|
|
GroupId = "",
|
|
EventTime = DateTime.UtcNow,
|
|
};
|
|
|
|
tenantAuditRecords.Add(auditRecord);
|
|
|
|
UserAuditRecord userAuditRecord = new UserAuditRecord
|
|
{
|
|
AppKey = job.AppKey,
|
|
FileId = auditEventMetadata[i].FileId,
|
|
FileName = auditEventMetadata[i].FileName,
|
|
Message = auditEventMetadata[i].Message,
|
|
UserId = auditEventMetadata[i].UserId,
|
|
Action = auditEventMetadata[i].Action,
|
|
EventTime = DateTime.UtcNow,
|
|
GroupId = ""
|
|
};
|
|
userAuditRecords.Add(userAuditRecord);
|
|
}
|
|
|
|
await AuditDB.AppendRecordsList(job.AppKey, tenantAuditRecords, TenantAuditContainer);
|
|
await AuditDB.AppendRecordsList(userAuditRecords[0].UserId, userAuditRecords, UserAuditContainer);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError($"{ex.Message}");
|
|
}
|
|
return;
|
|
}
|
|
|
|
internal async Task ProcessKeyVaultInsertionJob(Job cdpJob)
|
|
{
|
|
KeyVaultService kvs = new KeyVaultService(Constants.KeyVaultURI);
|
|
KeyVaultEvent vaultEvent;
|
|
try
|
|
{
|
|
vaultEvent = JsonSerializer.Deserialize<KeyVaultEvent>(cdpJob.JobMetadata);
|
|
} catch (Exception ex)
|
|
{
|
|
_logger.LogError("Error parsing keyvault event metadata error: {0}",ex.ToString());
|
|
return;
|
|
}
|
|
await kvs.SetSecretAsync(vaultEvent.FileId, vaultEvent.AESKey);
|
|
return;
|
|
}
|
|
|
|
internal async Task ProcessMailMetaJob(Job cdpJob)
|
|
{
|
|
string Connection = Constants.StorageConnString;
|
|
var blobServiceClient = new BlobServiceClient(Connection);
|
|
var blobClient = blobServiceClient.GetBlobContainerClient(cdpJob.AppKey.ToLower());
|
|
var blob = blobClient.GetBlobClient(string.Format("{0}", cdpJob.Id));
|
|
var exists = await blob.ExistsAsync();
|
|
if (!exists)
|
|
{
|
|
return;
|
|
}
|
|
BlobDownloadInfo blobDownloadInfo = await blob.DownloadAsync();
|
|
_logger.LogDebug($"Exists?: {exists.ToString()}");
|
|
_logger.LogDebug($"Download resp: {blobDownloadInfo.Details}");
|
|
using (MemoryStream ms = new MemoryStream())
|
|
{
|
|
await blobDownloadInfo.Content.CopyToAsync(ms);
|
|
byte[] byteArray = ms.ToArray();
|
|
string mailRecJson = System.Text.Encoding.UTF8.GetString(byteArray);
|
|
_logger.LogInformation("got event json from blob: " + mailRecJson);
|
|
MailRecord record = JsonSerializer.Deserialize<MailRecord>(mailRecJson);
|
|
await MailProcessor.ProcessMailEvent(record, _logger);
|
|
}
|
|
return;
|
|
}
|
|
}
|
|
|
|
public class MailRecord
|
|
{
|
|
public required string AppKey { get; set; }
|
|
public required string SenderEmail { get; set; }
|
|
public required string MailId { get; set; } // this is actually body Id
|
|
public required string BodyContent { get; set; } // this is body content
|
|
public required List<string> Attachments { get; set; }
|
|
public required List<string> ReceiverEmails { get; set; }
|
|
public required List<AttachmentDetails> AttachmentDetails { get; set; }
|
|
}
|
|
|
|
public class AttachmentDetails
|
|
{
|
|
public required string FileId { get; set; }
|
|
public required string HeadRevisionId { get; set; }
|
|
public string? FileType { get; set; }
|
|
public required string FileName { get; set; }
|
|
}
|
|
|
|
public class Job
|
|
{
|
|
public required string Id { get; set; }
|
|
public required string AppKey { get; set; }
|
|
public required JobType EventType { get; set; }
|
|
public string? JobMetadata { get; set; }
|
|
}
|
|
|
|
public enum JobType
|
|
{
|
|
MailMetaProcessing,
|
|
KeyVaultInsertion,
|
|
AddAudits,
|
|
KeyVaultInsertionBatch,
|
|
AddAuditsBatch
|
|
}
|
|
public class KeyVaultEvent
|
|
{
|
|
public required string FileId { get; set; }
|
|
public required string AESKey { get; set; }
|
|
}
|
|
|
|
public class AuditEventMetadata
|
|
{
|
|
public required string FileId { get; set; }
|
|
public string FileName { get; set; }
|
|
public string UserId { get; set; }
|
|
public string GroupId { get; set; }
|
|
public string Action { get; set; }
|
|
public string Message { get; set; }
|
|
}
|
|
}
|