Posts

Showing posts from February, 2025

🌐Filtering and Copying Files Dynamically in Azure Data Factory (ADF)

Image
  Filtering and Copying Files Dynamically in Azure Data Factory (ADF) Introduction In Azure Data Factory (ADF) , automating file processing is a common use case. This guide demonstrates how to: Retrieve file metadata (including filenames inside a folder). Filter files based on today’s date. Apply advanced filter conditions such as contains() , startswith() , and endswith() . Copy only filtered files dynamically using ForEach and Copy Data activities. Step 1: Set Variable for Today's Date The first step is to create a Set Variable activity to store today’s date in ddMMyyyy format. Activity: Set Variable Variable Name: dt Value: @formatDateTime(utcNow(), 'ddMMyyyy') Explanation utcNow() fetches the current UTC time. formatDateTime(utcNow(), 'ddMMyyyy') converts it into a day-month-year format. Example Output: { "name": "dt", "value": "28022025" } Step 2: Get Metadata of Folder Contents ...

🌐Understanding the Difference Between Dataflow and Pipelines in azure

  Understanding the Difference Between Dataflow and Pipelines In data engineering, two important terms are Dataflow and Pipeline . While they seem similar, they serve different purposes. This blog will explain their key differences, when to use them, and how they work together. What is a Pipeline? A pipeline is a series of steps that move and process data. Pipelines help automate workflows , ensuring different tasks run in the right order. Key Features of Pipelines: Manages Workflows : Pipelines organize and automate data processing. Task Scheduling : They decide when and how tasks should run. Integrates Multiple Tools : Pipelines can connect databases, APIs, and storage. Runs on a Schedule or Event : They can run at set times or when something triggers them. Popular Pipeline Tools: Apache Airflow (workflow management) Azure Data Factory (data automation) AWS Step Functions (workflow automation) Google Cloud Composer (managed Apache Airflow) What is Dataflow...

🔥Smart Recharge Tracking: Calculating Expiry Dates with PySpark

Smart Recharge Tracking: Calculating Expiry Dates with PySpark When working with telecom or subscription-based datasets, a common requirement is to calculate the expiry date of a recharge or subscription. In this blog post, we will walk through a simple PySpark solution to add an Expiry_Date column by computing it from the Recharge_Date and Remaining_Days columns. Problem Statement We have a dataset that includes the following fields: Recharge_Id : A unique identifier for each recharge Recharge_Date : The date when the recharge was done Remaining_Days : The number of validity days left Validity : Status (e.g., online) Our goal is to compute the Expiry_Date , which is calculated as: Expiry_Date = Recharge_Date + Remaining_Days Sample Input Data Recharge_Id Recharge_Date Remaining_Days Validity 123 2020-05-11 2 online 124 2020-06-12 67 online 125 2020-07-13 89 online 126 2020-08-14 78 online 127 2020-09-16 20 online Expected Output Rech...

🔥Spark Date Functions

Spark Date Functions Introduction Apache Spark, by default, considers all data as strings. Even if a dataset contains various data types like integers, dates, and characters, Spark initially treats everything as a string. To enable proper data type recognition, we use option("inferSchema","true") when reading CSV files. However, even with schema inference, Spark may not recognize date formats unless they follow the yyyy-MM-dd pattern. Converting Strings to Date Format If your date data is not in the yyyy-MM-dd format, you must convert it to the correct format to perform operations like date differences. Example Dataset Download the dataset from: Google Drive Databricks Cloud Creating a DataFrame from pyspark.sql import SparkSession from pyspark.sql.functions import col, coalesce, to_date, date_add, date_sub, date_format, current_date, current_timestamp # Initialize Spark Session spark = SparkSession.builder.master("local[2]").appName("...

🔥 Mastering PySpark: Step-by-Step Email Masking with Detailed Explanations

✨ Mastering PySpark: Step-by-Step Email Masking with Detailed Explanations When dealing with sensitive data, it's crucial to mask personally identifiable information (PII) like email addresses. In this blog, we’ll explore a step-by-step breakdown of how to mask email usernames based on domain-specific logic using PySpark . 🚀 Importing Essential PySpark Functions from pyspark.sql.functions import col, when, substring, concat, regexp_replace, reverse 🔍 Explanation: col : Refers to DataFrame columns in transformations. when : Implements conditional logic similar to SQL's CASE WHEN . substring : Extracts portions of a string based on starting index and length. concat : Concatenates multiple columns or string values into one. regexp_replace : Replaces characters in a string using regular expressions. reverse : Reverses the characters in a given string. 💻 The Core Transformation Code df_final = df_step1.withColumn( "MaskedUsername", when(col("Domain...

🌐Understanding the Differences Between Azure Data Factory's Get Metadata and Lookup Activities

# Understanding the Differences Between Azure Data Factory's Get Metadata and Lookup Activities Azure Data Factory ( ADF ) provides various activities to handle data orchestration efficiently. Among these, the Get Metadata and Lookup activities are often misunderstood due to their seemingly similar data retrieval functions. However, they serve distinct purposes and operate differently. This blog will explore the differences, use cases, advantages, limitations, and how you can use them effectively in your data pipelines. 🔍 Get Metadata Activity 🎯 Purpose: The Get Metadata activity is designed to retrieve metadata information about data stored in your dataset. This metadata includes details like file size, file name, last modified date, schema, and child items in folders. ⚡ Key Actions: Retrieve properties such as file size, last modified time, and file names. Get lists of files in a folder. Fetch schema details (like column names and data types). Validate file structure b...

🌐 Azure Data Factory Pipeline: Copy SQL Server Tables to Azure Data Lake Gen2

  Azure Data Factory Pipeline: Copy SQL Server Tables to Azure Data Lake Gen2 This blog provides a step-by-step guide to create an Azure Data Factory (ADF) pipeline that lists all tables from a SQL Server database and copies them one by one into Azure Data Lake Gen2. Step 1: Create Pipeline with Lookup Activity Add Lookup Activity In the pipeline canvas, drag and drop the Lookup activity. Source Dataset Create a New Dataset : Name: sqlserval Linked service: lssql (SQL Server linked service) Query Settings Use the following query: SELECT TABLE_NAME FROM information_schema.tables WHERE table_type = 'BASE TABLE'; Uncheck the "First row only" option. Debug Click Debug to ensure all SQL Server tables are listed in the output. Step 2: ForEach Activity for Iteration Add ForEach Activity Drag ForEach activity to the canvas. Connect it to the Lookup activity. Settings Tab Sequential: Enable this option. Items: Add the ...

🔥 Masking Aadhar Card Numbers and Email Addresses in PySpark

Masking Aadhar Card Numbers and Email Addresses in PySpark Introduction Data privacy and security are crucial when handling sensitive information like Aadhar card numbers and email addresses. In this blog, we will: Mask Aadhar card numbers while showing only the last four digits. Mask email addresses differently based on their domain: gmail.com : Show only the first character and mask the rest. yahoo.com : Reverse the username. hotmail.com : Replace vowels with * and mask the middle part. All these transformations will be performed using PySpark functions. 1. Masking Aadhar Card Numbers Step-by-Step Explanation Remove special characters : Use regexp_replace to remove hyphens and spaces. Mask the number : Display only the last four digits and replace the first 12 digits with * . PySpark Code from pyspark.sql import SparkSession from pyspark.sql.functions import col, regexp_replace, lit, substring, concat # Initialize Spark Session spark = SparkSession.builder.mast...

🌐 Creating an Azure Data Factory Pipeline to Copy Multiple Files to SQL Server

🚀 Step-by-Step Guide: Creating an Azure Data Factory Pipeline to Copy Multiple Files to SQL Server ✨ 📋 Prerequisites Before you begin, ensure you have the following resources in place: 🗂️ Azure Storage Account (Gen2) with: One Storage Bucket (Container) One Input Folder containing at least one .csv file 🏦 SQL Server Database configured and accessible. ⚡ Azure Data Factory (ADF) instance set up in your Azure subscription. pls refer periovs blog also for creating azure factory and sql serval database 💡 These components are essential to build and execute the pipeline successfully. 📌 Step-by-Step Process to Create the Required Components: Create Azure Storage Account: Navigate to Azure Portal > Storage Accounts > Create. Provide details and deploy. Create Input Folder: In the storage account, create a container and upload .csv files to a folder. Set Up SQL Server: Deploy a SQL Server in Azure, create a database, and ensure appropriate firewall se...

🌐 Azure Learning Path

🌐 Azure Learning Path Explore my Azure learning journey through these step-by-step guides: 🚀 Getting Started with Azure Create Your Free Azure Account in 2025 How to Create Azure Free Account in 2025 Getting Started with Azure: Creating an Account, Subscription, Resource Group, and Storage Account Getting Started with Azure: Creating Your First Resource 🔄 Azure Data Factory Azure Data Factory: Copying Data from ADLS to MSSQL Azure Data Factory: Copying Data from Blob Storage to SQL Database 🔒 Azure Key Vault Manage Secrets and Keys Securely with Azure Key Vault Azure Key Vault: Securely Managing Secrets and Keys 🌐 pipeline 5.Creating an Azure Data Factory Pipeline to Copy Multiple Files to SQL Server https://datanexus02.blogspot.com/2025/02/divyarajurkar_02090420676.html

🔥Mastering PySpark Column Operations

  Mastering PySpark Column Operations Apache Spark is a powerful distributed computing framework, and PySpark is its Python API, widely used for big data processing. This blog post explores various column operations in PySpark, covering basic to advanced transformations with detailed explanations and code examples. 1. Installing and Setting Up PySpark Before working with PySpark, install it using: pip install pyspark Import the required libraries and set up a Spark session: from pyspark.sql import SparkSession, Row from pyspark.sql.functions import col, lit, upper, lower, when, count, avg, sum, udf from pyspark.sql.types import StringType spark = SparkSession.builder.appName("PySparkColumnOperations").getOrCreate() 2. Understanding Row Objects Basic Row Object Creation from pyspark.sql import Row r1 = Row(1, 'divya') print(r1) # Output: Row(_1=1, _2='divya') Since no field names are provided, PySpark assigns default field names: _1 and _2 . N...

🔥PySpark Column Functions: A Comprehensive Guide

PySpark Column Functions: A Comprehensive Guide PySpark provides a powerful set of functions to manipulate and transform data efficiently. In this guide, we'll explore various column functions with examples and their outputs. 1. col() - Select a Column from pyspark.sql import functions as F df.select(F.col('id')).show() Output: +---+ | id| +---+ | 1| | 2| | 3| | 4| +---+ 2. lit() - Add a Constant Column df.withColumn('constant', F.lit(10)).show() Output: +---+-----+--------+ | id| name|constant| +---+-----+--------+ | 1|Alice| 10| | 2| Bob| 10| | 3|Alice| 10| | 4|Charlie| 10| +---+-----+--------+ 3. when() - Conditional Column df.withColumn('is_adult', F.when(F.col('id') > 1, 'Yes').otherwise('No')).show() Output: +---+-----+--------+ | id| name|is_adult| +---+-----+--------+ | 1|Alice| No| | 2| Bob| Yes| | 3|Alice| Yes| | 4|Charlie| Yes| +---+-----+--------+ 4. isNull() - Filter N...

🌐Azure, end-to-end data pipeline into detailed baby steps

 I'll break down the entire process of creating this end-to-end data pipeline into detailed baby steps so you can follow along without any confusion. The pipeline will use: Apache Nifi for ingesting data from a REST API Apache Kafka for data streaming Apache Spark Structured Streaming for processing data Snowflake for storing processed data 🔧 Step 1: Set Up Apache NiFi Purpose: To fetch data from the online server and send it to Kafka. ✅ 1.1 Download and Install NiFi Go to NiFi Download Page Download nifi-1.28.1-bin.zip Extract the ZIP file to: C:\bigdata Navigate to: C:\bigdata\nifi-1.28.1\bin Run NiFi: Double-click the run-nifi.bat (Windows) or use the command line: nifi.bat start Wait for NiFi to initialize. ✅ 1.2 Access and Login to NiFi UI Open your browser and go to: https://localhost:8443/nifi/ Locate nifi-app.log for login credentials: C:\bigdata\nifi-1.28.1\logs\nifi-app.log Login using the credentials found in the log. 🔗 Step ...

🔥Fake Apache Log Generator: A Powerful Tool for Testing and Analytics

  # 🚀 Fake Apache Log Generator: A Powerful Tool for Testing and Analytics 📝 🌟 Introduction The Fake Apache Log Generator is a Python script that generates a large number of fake Apache logs quickly and efficiently. ⚡ It is particularly useful for creating synthetic workloads for data ingestion pipelines, analytics applications, and testing environments. 🧪 This tool can output log lines to the console, log files, or directly to gzip files, providing flexibility depending on your use case. 💡 🌈 Key Features ✨ High-Speed Log Generation : Generate large volumes of logs rapidly. 🖨️ Flexible Output Options : Supports output to console, log files ( .log ), or compressed gzip files ( .gz ). 🌍 Realistic Log Data : Leverages the Faker library to create realistic IP addresses, URIs, and more. 🛠️ Customizable Output : Allows customization of the number of lines, output file prefix, and interval between log entries. 🔄 Infinite Generation : Supports infinite log generation, ...

🚀End-to-End Data Flow Pipeline using Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake

🚀 End-to-End Data Flow Pipeline using Apache NiFi, Kafka-Spark Structured Streaming, and Snowflake 💬 Personal Note: 🌟 I was unwell for a while, which caused a pause in my blogging journey. However, I’m feeling much better now and back on track. From now on, I will be posting blogs consistently. Thank you all for your support! 🙏✨ 🔄 Flow of Data in this Pipeline: Server (https://randomuser.me/api/) ↓ (REST API) Apache NiFi (InvokeHTTP Processor) ↓ Kafka (Kafka Brokers - PublishKafkaRecord_2_6 Processor) ↓ Consumer (Kafka Structured Streaming - Spark) ↓ Snowflake (Data Storage) 🌐 Project Overview: This project demonstrates a real-time data streaming pipeline that integrates data collection, processing, and storage using industry-standard tools: 🌐 Data Collection: Fetched from randomuser.me using Apache NiFi’s InvokeHTTP processor. 🏭 Streaming Data: Pushed into Kafka using PublishKafkaRecord_2_6 . ⚡ Data Processing: Apache Spark Structured Streaming co...

🦘Kafka Integrated with Spark Structured Streaming

Image
Kafka Integrated with Spark Structured Streaming 🚀 Apache Kafka Apache Kafka is an open-source data streaming platform that stores, processes, and analyzes large amounts of real-time data. It is used to build real-time data pipelines and applications that can adapt to data streams. 🔥 Event Streaming Event streaming is the digital equivalent of the human body's central nervous system. It is the technological foundation for the "always-on" world, where businesses are increasingly software-defined and automated. Technically, event streaming involves capturing data in real-time from event sources like databases, sensors, mobile devices, cloud services, and applications. It then stores, processes, and routes these events to different destinations as needed, ensuring continuous data flow. ⚡ Kafka as an Event Streaming Platform Kafka combines three key capabilities: 📤 Publishing and subscribing to streams of events. 💾 Storing streams of events durably and reliably. ⏳ Proc...